Tim's Tech Thoughts

Remediating Unencrypted EBS Volumes: Encryption in Action

2024-08-27 AWS Timothy Patterson

Recap: Preparing for Full Encryption

In the last two posts, I discussed the importance of encrypting data at rest and how to identify unencrypted EBS volumes attached to EC2 instances by using a Python script. After generating a report, I outlined the initial steps for remediation, which included gathering information about unencrypted volumes.

Now, it’s time to take action in an automated fashion. In this post, I’ll guide you through the process of actually encrypting unencrypted EBS volumes using the encrypt-ec2-ebs-vols.py Python script. This script automates the encryption process, ensuring that your EC2 instances’ volumes are fully secured. Note that EC2 instances will be powered off during these operations, and I’ll handle instances differently based on their initial state.

Introducing the encrypt-ec2-ebs-vols.py Script

Remember, the steps required to encrypt an EBS volume that is attached to an EC2 instance are:

Determine KMS Key to Use: It is very important to have your destination in mind. For example, if your goal is simplicity, you may use the built-in AWS KMS provided default key for encrypting your volumes. However, if you are going to be using something like the AWS Backup service to copy recovery points containing these volumes across AWS accounts or AWS regions, you will need to pivot your KMS key strategy accordingly. I highly recommend you create a new multi-region KMS key for this exclusive purpose.

Power Down the EC2 Instance: For safety and to ensure data integrity, stop the EC2 instance. This prevents any changes to the volume during the encryption process. It is super important to understand that there is no way to perform a live re-encryption operation in place. You must plan for downtime accordingly. The amount of downtime for each instance depends on a number of factors, such as number of EBS volumes attached to the instance, the EBS volume type, EBS volume size, and how much of the volume is in active use (in terms of blocks written).

Detach the EBS Volumes: Using the AWS Management Console or AWS CLI, detach the unencrypted EBS volumes from the instance.

Create an Unencrypted Snapshot: Before making any modifications, create a snapshot of the unencrypted volume. This acts as a backup and ensures that no data is lost during the process.

Copy the Snapshot with Encryption: Create a new encrypted snapshot from the unencrypted one. This step uses your KMS key to encrypt the data.

Create a New EBS Volume: Once you have the encrypted snapshot, create a new EBS volume from it.

Reattach the Encrypted EBS Volume: Attach the newly encrypted EBS volume to your EC2 instance.

Power the EC2 Instance Back On: Finally, restart the instance, and it will now have encrypted EBS volumes attached.

I have created a Python script that will automate this entire process end-to-end. The script makes sure to output diagnostic data along the way. I highly recommend using the Linux “tee” command with it to save the output to a file as it runs!

The encrypt-ec2-ebs-vols.py Script

Here is the full source code of the script.

import boto3
import argparse
from prettytable import PrettyTable
import botocore

def get_boto3_session(profile_name=None, region_name=None):
    session_args = {}
    if profile_name:
        print(f"Using AWS profile: {profile_name}")
        session_args['profile_name'] = profile_name
    if region_name:
        print(f"Using AWS region: {region_name}")
        session_args['region_name'] = region_name
    
    session = boto3.Session(**session_args)
    
    return session

def get_instance_state(ec2_client, instance_id):
    """Check if the instance is running or stopped."""
    response = ec2_client.describe_instances(InstanceIds=[instance_id])
    state = response['Reservations'][0]['Instances'][0]['State']['Name']
    return state

def stop_instance(ec2_client, instance_id):
    print(f"Stopping EC2 instance {instance_id}...")
    ec2_client.stop_instances(InstanceIds=[instance_id])
    waiter = ec2_client.get_waiter('instance_stopped')
    waiter.wait(InstanceIds=[instance_id])
    print(f"Instance {instance_id} stopped.")

def start_instance(ec2_client, instance_id):
    print(f"Starting EC2 instance {instance_id}...")
    ec2_client.start_instances(InstanceIds=[instance_id])
    waiter = ec2_client.get_waiter('instance_running')
    waiter.wait(InstanceIds=[instance_id])
    print(f"Instance {instance_id} is running.")

def detach_volumes(ec2_client, ec2_resource, instance_id):
    instance = ec2_resource.Instance(instance_id)
    volume_ids = []
    for volume in instance.volumes.all():
        attachment = volume.attachments[0] if volume.attachments else None
        device_name = attachment['Device'] if attachment else "unknown"
        
        print(f"Detaching volume {volume.id} ({device_name}) from instance {instance_id}...")
        ec2_client.detach_volume(VolumeId=volume.id, InstanceId=instance_id)
        
        waiter = ec2_client.get_waiter('volume_available')
        waiter.wait(VolumeIds=[volume.id])
        
        print(f"Volume {volume.id} ({device_name}) detached and is now available.")
        volume_ids.append(volume.id)
    return volume_ids

def wait_for_snapshot(ec2_client, snapshot_id):
    """Wait up to 24 hours for a snapshot to complete."""
    waiter = ec2_client.get_waiter('snapshot_completed')
    
    # Create a custom waiter config to wait up to 24 hours (1440 attempts at 60-second intervals)
    waiter_config = {
        'Delay': 60,  # Wait 60 seconds between attempts
        'MaxAttempts': 1440  # Wait for up to 1440 attempts (24 hours)
    }
    
    try:
        waiter.wait(SnapshotIds=[snapshot_id], WaiterConfig=waiter_config)
        print(f"Snapshot {snapshot_id} completed.")
    except botocore.exceptions.WaiterError as e:
        print(f"Error: {e}")
        raise

def encrypt_volume(ec2_client, ec2_resource, volume_id, region_name, kms_key_id=None):
    original_volume = ec2_resource.Volume(volume_id)
    
    volume_type = original_volume.volume_type
    volume_size = original_volume.size
    iops = original_volume.iops
    throughput = original_volume.throughput
    availability_zone = original_volume.availability_zone
    
    print(f"Encrypting volume {volume_id} (Type: {volume_type}, Size: {volume_size} GB, IOPS: {iops}, Throughput: {throughput} MB/s)...")
    
    snapshot = ec2_client.create_snapshot(VolumeId=volume_id, Description=f"Snapshot of volume {volume_id} before encryption")
    snapshot_id = snapshot['SnapshotId']
    print(f"Snapshot {snapshot_id} created for volume {volume_id}. Waiting for snapshot to complete...")
    
    # Wait for the snapshot to complete (up to 24 hours)
    wait_for_snapshot(ec2_client, snapshot_id)

    encrypted_snapshot = ec2_client.copy_snapshot(
        SourceSnapshotId=snapshot_id,
        SourceRegion=region_name,
        Encrypted=True,
        KmsKeyId=kms_key_id  # Use the provided KMS Key alias or ID for encryption
    )
    encrypted_snapshot_id = encrypted_snapshot['SnapshotId']
    print(f"Encrypted snapshot {encrypted_snapshot_id} created. Waiting for encryption to complete...")

    # Wait for the encrypted snapshot to complete (up to 24 hours)
    wait_for_snapshot(ec2_client, encrypted_snapshot_id)

    volume_creation_params = {
        "SnapshotId": encrypted_snapshot_id,
        "AvailabilityZone": availability_zone,
        "VolumeType": volume_type,
        "Size": volume_size,
        "Encrypted": True,
        "KmsKeyId": kms_key_id  # Use the provided KMS Key alias or ID for encryption
    }

    if volume_type in ["gp3", "io1", "io2"]:
        volume_creation_params["Iops"] = iops
    if volume_type == "gp3":
        volume_creation_params["Throughput"] = throughput

    encrypted_volume = ec2_client.create_volume(**volume_creation_params)
    encrypted_volume_id = encrypted_volume['VolumeId']
    print(f"Encrypted volume {encrypted_volume_id} created. Waiting for volume to be available...")

    ec2_client.get_waiter('volume_available').wait(VolumeIds=[encrypted_volume_id])
    print(f"Encrypted volume {encrypted_volume_id} is now available.")
    
    ec2_client.delete_snapshot(SnapshotId=snapshot_id)
    print(f"Deleted original snapshot {snapshot_id}.")
    
    ec2_client.delete_snapshot(SnapshotId=encrypted_snapshot_id)
    print(f"Deleted encrypted snapshot {encrypted_snapshot_id}.")
    
    return encrypted_volume_id

def attach_volumes(ec2_client, ec2_resource, instance_id, encrypted_volumes):
    instance = ec2_resource.Instance(instance_id)
    for device, volume_id in encrypted_volumes.items():
        print(f"Attaching encrypted volume {volume_id} to instance {instance_id} at {device}...")
        ec2_client.attach_volume(InstanceId=instance_id, VolumeId=volume_id, Device=device)
        
        waiter = ec2_client.get_waiter('volume_in_use')
        waiter.wait(VolumeIds=[volume_id])
        
        print(f"Volume {volume_id} attached at {device}.")

def display_volume_mapping(original_volumes, encrypted_volumes):
    table = PrettyTable()
    table.field_names = ["Original Volume ID", "Encrypted Volume ID"]
    
    for device, original_volume_id in original_volumes.items():
        encrypted_volume_id = encrypted_volumes.get(device, "N/A")
        table.add_row([original_volume_id, encrypted_volume_id])
    
    print("\nVolume Mapping (Original vs Encrypted):")
    print(table)

def main(instance_id, profile_name, region_name, kms_key_id):
    session = get_boto3_session(profile_name, region_name)
    
    ec2_client = session.client('ec2')
    ec2_resource = session.resource('ec2')

    # Check if the instance is running
    instance_state = get_instance_state(ec2_client, instance_id)
    instance_was_running = instance_state == 'running'
    print(f"Instance {instance_id} state at the start: {instance_state}")

    # Stop the instance if it was running
    if instance_was_running:
        stop_instance(ec2_client, instance_id)

    instance = ec2_resource.Instance(instance_id)
    original_volumes = {v.attachments[0]['Device']: v.id for v in instance.volumes.all()}

    detach_volumes(ec2_client, ec2_resource, instance_id)

    encrypted_volumes = {}
    for device, volume_id in original_volumes.items():
        encrypted_volume_id = encrypt_volume(ec2_client, ec2_resource, volume_id, region_name, kms_key_id)
        encrypted_volumes[device] = encrypted_volume_id

    attach_volumes(ec2_client, ec2_resource, instance_id, encrypted_volumes)

    # Only start the instance if it was originally running
    if instance_was_running:
        start_instance(ec2_client, instance_id)

    display_volume_mapping(original_volumes, encrypted_volumes)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Encrypt |oai:code-citation|

Once the script finishes, you should re-run the validation script from our previous post to ensure everything appears as it should now!

Of note:

  • The script will only power on the EC2 instance at the end if it was originally powered on at the start of the script’s execution.
  • There is a custom “waiter” definition in this script. This is to avoid timeout errors with the AWS API. Some snapshot operations can take a very long time.
Disclaimer: The opinions expressed herein are my own personal thoughts and do not represent the views of any present or past employer in any way.