Remediating Unencrypted EBS Volumes: Encryption in Action
Recap: Preparing for Full Encryption
In the last two posts, I discussed the importance of encrypting data at rest and how to identify unencrypted EBS volumes attached to EC2 instances by using a Python script. After generating a report, I outlined the initial steps for remediation, which included gathering information about unencrypted volumes.
Now, it’s time to take action in an automated fashion. In this post, I’ll guide you through the process of actually encrypting unencrypted EBS volumes using the encrypt-ec2-ebs-vols.py
Python script. This script automates the encryption process, ensuring that your EC2 instances’ volumes are fully secured. Note that EC2 instances will be powered off during these operations, and I’ll handle instances differently based on their initial state.
Introducing the encrypt-ec2-ebs-vols.py
Script
Remember, the steps required to encrypt an EBS volume that is attached to an EC2 instance are:
Determine KMS Key to Use: It is very important to have your destination in mind. For example, if your goal is simplicity, you may use the built-in AWS KMS provided default key for encrypting your volumes. However, if you are going to be using something like the AWS Backup service to copy recovery points containing these volumes across AWS accounts or AWS regions, you will need to pivot your KMS key strategy accordingly. I highly recommend you create a new multi-region KMS key for this exclusive purpose.
Power Down the EC2 Instance: For safety and to ensure data integrity, stop the EC2 instance. This prevents any changes to the volume during the encryption process. It is super important to understand that there is no way to perform a live re-encryption operation in place. You must plan for downtime accordingly. The amount of downtime for each instance depends on a number of factors, such as number of EBS volumes attached to the instance, the EBS volume type, EBS volume size, and how much of the volume is in active use (in terms of blocks written).
Detach the EBS Volumes: Using the AWS Management Console or AWS CLI, detach the unencrypted EBS volumes from the instance.
Create an Unencrypted Snapshot: Before making any modifications, create a snapshot of the unencrypted volume. This acts as a backup and ensures that no data is lost during the process.
Copy the Snapshot with Encryption: Create a new encrypted snapshot from the unencrypted one. This step uses your KMS key to encrypt the data.
Create a New EBS Volume: Once you have the encrypted snapshot, create a new EBS volume from it.
Reattach the Encrypted EBS Volume: Attach the newly encrypted EBS volume to your EC2 instance.
Power the EC2 Instance Back On: Finally, restart the instance, and it will now have encrypted EBS volumes attached.
I have created a Python script that will automate this entire process end-to-end. The script makes sure to output diagnostic data along the way. I highly recommend using the Linux “tee” command with it to save the output to a file as it runs!
The encrypt-ec2-ebs-vols.py
Script
Here is the full source code of the script.
import boto3
import argparse
from prettytable import PrettyTable
import botocore
def get_boto3_session(profile_name=None, region_name=None):
session_args = {}
if profile_name:
print(f"Using AWS profile: {profile_name}")
session_args['profile_name'] = profile_name
if region_name:
print(f"Using AWS region: {region_name}")
session_args['region_name'] = region_name
session = boto3.Session(**session_args)
return session
def get_instance_state(ec2_client, instance_id):
"""Check if the instance is running or stopped."""
response = ec2_client.describe_instances(InstanceIds=[instance_id])
state = response['Reservations'][0]['Instances'][0]['State']['Name']
return state
def stop_instance(ec2_client, instance_id):
print(f"Stopping EC2 instance {instance_id}...")
ec2_client.stop_instances(InstanceIds=[instance_id])
waiter = ec2_client.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
print(f"Instance {instance_id} stopped.")
def start_instance(ec2_client, instance_id):
print(f"Starting EC2 instance {instance_id}...")
ec2_client.start_instances(InstanceIds=[instance_id])
waiter = ec2_client.get_waiter('instance_running')
waiter.wait(InstanceIds=[instance_id])
print(f"Instance {instance_id} is running.")
def detach_volumes(ec2_client, ec2_resource, instance_id):
instance = ec2_resource.Instance(instance_id)
volume_ids = []
for volume in instance.volumes.all():
attachment = volume.attachments[0] if volume.attachments else None
device_name = attachment['Device'] if attachment else "unknown"
print(f"Detaching volume {volume.id} ({device_name}) from instance {instance_id}...")
ec2_client.detach_volume(VolumeId=volume.id, InstanceId=instance_id)
waiter = ec2_client.get_waiter('volume_available')
waiter.wait(VolumeIds=[volume.id])
print(f"Volume {volume.id} ({device_name}) detached and is now available.")
volume_ids.append(volume.id)
return volume_ids
def wait_for_snapshot(ec2_client, snapshot_id):
"""Wait up to 24 hours for a snapshot to complete."""
waiter = ec2_client.get_waiter('snapshot_completed')
# Create a custom waiter config to wait up to 24 hours (1440 attempts at 60-second intervals)
waiter_config = {
'Delay': 60, # Wait 60 seconds between attempts
'MaxAttempts': 1440 # Wait for up to 1440 attempts (24 hours)
}
try:
waiter.wait(SnapshotIds=[snapshot_id], WaiterConfig=waiter_config)
print(f"Snapshot {snapshot_id} completed.")
except botocore.exceptions.WaiterError as e:
print(f"Error: {e}")
raise
def encrypt_volume(ec2_client, ec2_resource, volume_id, region_name, kms_key_id=None):
original_volume = ec2_resource.Volume(volume_id)
volume_type = original_volume.volume_type
volume_size = original_volume.size
iops = original_volume.iops
throughput = original_volume.throughput
availability_zone = original_volume.availability_zone
print(f"Encrypting volume {volume_id} (Type: {volume_type}, Size: {volume_size} GB, IOPS: {iops}, Throughput: {throughput} MB/s)...")
snapshot = ec2_client.create_snapshot(VolumeId=volume_id, Description=f"Snapshot of volume {volume_id} before encryption")
snapshot_id = snapshot['SnapshotId']
print(f"Snapshot {snapshot_id} created for volume {volume_id}. Waiting for snapshot to complete...")
# Wait for the snapshot to complete (up to 24 hours)
wait_for_snapshot(ec2_client, snapshot_id)
encrypted_snapshot = ec2_client.copy_snapshot(
SourceSnapshotId=snapshot_id,
SourceRegion=region_name,
Encrypted=True,
KmsKeyId=kms_key_id # Use the provided KMS Key alias or ID for encryption
)
encrypted_snapshot_id = encrypted_snapshot['SnapshotId']
print(f"Encrypted snapshot {encrypted_snapshot_id} created. Waiting for encryption to complete...")
# Wait for the encrypted snapshot to complete (up to 24 hours)
wait_for_snapshot(ec2_client, encrypted_snapshot_id)
volume_creation_params = {
"SnapshotId": encrypted_snapshot_id,
"AvailabilityZone": availability_zone,
"VolumeType": volume_type,
"Size": volume_size,
"Encrypted": True,
"KmsKeyId": kms_key_id # Use the provided KMS Key alias or ID for encryption
}
if volume_type in ["gp3", "io1", "io2"]:
volume_creation_params["Iops"] = iops
if volume_type == "gp3":
volume_creation_params["Throughput"] = throughput
encrypted_volume = ec2_client.create_volume(**volume_creation_params)
encrypted_volume_id = encrypted_volume['VolumeId']
print(f"Encrypted volume {encrypted_volume_id} created. Waiting for volume to be available...")
ec2_client.get_waiter('volume_available').wait(VolumeIds=[encrypted_volume_id])
print(f"Encrypted volume {encrypted_volume_id} is now available.")
ec2_client.delete_snapshot(SnapshotId=snapshot_id)
print(f"Deleted original snapshot {snapshot_id}.")
ec2_client.delete_snapshot(SnapshotId=encrypted_snapshot_id)
print(f"Deleted encrypted snapshot {encrypted_snapshot_id}.")
return encrypted_volume_id
def attach_volumes(ec2_client, ec2_resource, instance_id, encrypted_volumes):
instance = ec2_resource.Instance(instance_id)
for device, volume_id in encrypted_volumes.items():
print(f"Attaching encrypted volume {volume_id} to instance {instance_id} at {device}...")
ec2_client.attach_volume(InstanceId=instance_id, VolumeId=volume_id, Device=device)
waiter = ec2_client.get_waiter('volume_in_use')
waiter.wait(VolumeIds=[volume_id])
print(f"Volume {volume_id} attached at {device}.")
def display_volume_mapping(original_volumes, encrypted_volumes):
table = PrettyTable()
table.field_names = ["Original Volume ID", "Encrypted Volume ID"]
for device, original_volume_id in original_volumes.items():
encrypted_volume_id = encrypted_volumes.get(device, "N/A")
table.add_row([original_volume_id, encrypted_volume_id])
print("\nVolume Mapping (Original vs Encrypted):")
print(table)
def main(instance_id, profile_name, region_name, kms_key_id):
session = get_boto3_session(profile_name, region_name)
ec2_client = session.client('ec2')
ec2_resource = session.resource('ec2')
# Check if the instance is running
instance_state = get_instance_state(ec2_client, instance_id)
instance_was_running = instance_state == 'running'
print(f"Instance {instance_id} state at the start: {instance_state}")
# Stop the instance if it was running
if instance_was_running:
stop_instance(ec2_client, instance_id)
instance = ec2_resource.Instance(instance_id)
original_volumes = {v.attachments[0]['Device']: v.id for v in instance.volumes.all()}
detach_volumes(ec2_client, ec2_resource, instance_id)
encrypted_volumes = {}
for device, volume_id in original_volumes.items():
encrypted_volume_id = encrypt_volume(ec2_client, ec2_resource, volume_id, region_name, kms_key_id)
encrypted_volumes[device] = encrypted_volume_id
attach_volumes(ec2_client, ec2_resource, instance_id, encrypted_volumes)
# Only start the instance if it was originally running
if instance_was_running:
start_instance(ec2_client, instance_id)
display_volume_mapping(original_volumes, encrypted_volumes)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Encrypt |oai:code-citation|
Once the script finishes, you should re-run the validation script from our previous post to ensure everything appears as it should now!
Of note:
- The script will only power on the EC2 instance at the end if it was originally powered on at the start of the script’s execution.
- There is a custom “waiter” definition in this script. This is to avoid timeout errors with the AWS API. Some snapshot operations can take a very long time.