Coding Security Autoremediations

For today's lab we’ll roll up our sleeves, pull out our development tools, and, well... mostly copy and paste, as you learn some techniques for coding security autoremediation.

Watch Me Code This Live with AI (Pro Content)!

CloudSLAW is, and always will be, free. But to help cover costs and keep the content up to date we have an optional Patreon. For this lab Pro subscribers got to watch me as I set up my toolchain for writing and testing the code, and how I used AI to help. I was able to knock out all the code in an hour that would previously have taken me half a day. Like all Pro content, the video was recorded and is available to new subscribers Check it out!

Prerequisites

  • This is part 8 of the Advanced Cloud Security Problem Solving series, which I’ve been abbreviating to Epic Automation. If you haven’t completed parts 1-6 yet, jump back to the first post in the series and complete all prior posts before trying this one.

The Lesson

This is it folks, the culmination of months of work! (Or days, for you overachievers blasting through the website). We have everything set up, and today we’ll update permissions and deploy the code to enforce our desired outcome.

Oh, you forgot that? Me too! Let’s review…

If an S3 bucket is tagged with a classification of sensitive, only allow access from within our AWS Organization.

We originally hoped to set this using a Resource Control Policy, but there’s a limitation in IAM and RCP conditions: they don’t support S3 bucket tags. But we (okay, I) decided we could achieve the same objective with an autoremediation guardrail which would trigger whenever anyone tags a bucket or makes permissions changes to a bucket. While policies like SCPs and RCPs are great because they prevent something from happening, they can’t really handle all the complex scenarios we might encounter. For those situations we can use a corrective control which fixes something after the fact… often nearly instantly, thanks to the power of cloud.

Here’s the logic we built out. We monitor CloudTrail for certain API calls, use them as triggers, assess the bucket, then apply corrective controls to lock it down.

To this point we’ve set triggers, linked them in to run our Lambda function (code) to get the bucket’s information, and established our cross-account access and role chaining permissions.

All that’s left is the remediation code itself. And really, there isn’t much more to say about it in terms of a general lesson. But here are some points to note:

  • The main body is lambda_handler. This gets the event and some header information (context).

  • We extract the account ID of the account where the event occurred, and extract the name of the bucket from the event.

  • The function then assumes our target role in the target account, and the get_bucket_info function gathers all the information we want to make our security decisions, including the current tags.

  • If the bucket is tagged with classification:sensitive, then go “do things”.

    • Turn on Block Public Access (BPA), disable ACLs, and check the bucket policy.

    • Retrieve the bucket policy (if one exists) and add a statement to the bucket policy to restrict access to the current organization and AWS services.

      • For this to work the function needs to know your Organization ID. Since this account (SecurityOperations) does not have permissions for Organizations, it doesn’t have a way to pull the ID. Instead we will set it as an environment variable.

To make the lab easier to understand, I wrote all the code in a single that you can copy and paste into the console. The code is well-commented and should be easy to follow even if you are new to Python. Start in lambda_handler at the bottom, and you can walk through the logic and the various functions.

This should work well in a lot of situations, but it doesn’t have any IP address based permissions. This means it blocks all IP-based access, and only allows API-based access! We could write more complex code which could evaluate IP addresses and allow them if they are private-network-only, or from a specific IP address range, but that was beyond our current requirements. Just keep that very important limitation in mind if you use this technique yourself.

Another quick note on the code: you’ll notice that I don’t rely on any bucket information in the event, other than the bucket name and account ID. Every time I create an autoremediation, I always check the current state of the resource before making any changes. Why? Because life comes at you fast, especially in cloud, and I always want to operate knowing the real-time state of a resource.

Also, after my first pass I needed to go back through and make sure that the code only makes changes if the expected security isn’t in place. Why? The first version (in one spot) just set the secure condition every time, even if it was already there. But this change API call was picked up as one of our triggers, which ran the lambda again, which made the change again, and… infinite loops in the cloud are fun!

When you play battle bots in the cloud, only Amazon’s accounting department wins!

As I mentioned above, it took me about an hour to code up all the remediations with the help of an AI tool. I used our test event to trigger each test, but I specified a target bucket I could safely modify to play with different conditions. You can check out the Pro content in the Patreon for a deeper walkthrough, and to see how I built everything.

Finally, to make this work we need to update permissions and set that environment variable… both of which are in the lab.

Key Lesson Points

  • Always test your code on real resources in a test environment whenever you can. Local testing stacks are great, but don’t totally rely on them.

  • Consider using multiple browsers (or terminal shells) logged into different accounts so you can test without having to bounce around too much.

    • I show that in the Pro content, but we didn’t really need it for this lab. For deeper testing I write command line scripts instead of using the browser.

  • Watch out for infinite loops: make sure your changes don’t continuously trigger your app code.

    • There’s a different architecture that can handle this a little better which we didn’t use: tracking state with SQS and Dynamo Database. That’s much more complex to set up correctly, and the small amount of re-triggering this architecture performs won’t increase costs.

The Lab

We will do 3 things in this lab, and I recommend an optional 4th step:

  1. Get our org ID and set it as an environment variable for our Lambda function.

  2. Update our code with the new code I wrote that actually sets the security conditions.

  3. Update our automation role’s permissions, now that we’ve finished the code and know exactly what it will do.

  4. Make changes to our test bucket and watch the magic work!

Video Walkthrough

Step-by-Step

Start in your Sign-in portal > CloudSLAW > AdministratorAccess > Organizations >copy your Organization ID:

Close that tab > Sign-in portal > SecurityOperations > AdministratorAccess > Lambda > security-auto-s3:

Go to Configuration > Environment variables > Edit:

Then Add environment variable (click that once to show the text fields) and Key: org_id; Value: your organization’s ID > Save:

Then go to Code > copy and paste all the code below to replace the existing code > Deploy:

import json
import boto3
import re
import logging
from botocore.exceptions import ClientError
import os

# Set up logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def assume_role(account_id):
    """
    Assume the SecurityAutoremediation role in the target account
    """
    role_arn = f"arn:aws:iam::{account_id}:role/SecurityOperations/SecurityAutoremediation"
    logger.info(f"Assuming role: {role_arn}")
    
    sts_client = boto3.client('sts')
    
    try:
        response = sts_client.assume_role(
            RoleArn=role_arn,
            RoleSessionName="S3SecurityAnalysis"
        )
        
        credentials = response['Credentials']
        
        return {
            'aws_access_key_id': credentials['AccessKeyId'],
            'aws_secret_access_key': credentials['SecretAccessKey'],
            'aws_session_token': credentials['SessionToken']
        }
    except Exception as e:
        logger.error(f"Failed to assume role {role_arn}: {str(e)}")
        raise

def extract_bucket_name(event):
    """
    Extract S3 bucket name from event by checking multiple possible locations
    """
    # Check in resources array
    if 'resources' in event['detail'] and event['detail']['resources']:
        for resource in event['detail']['resources']:
            if resource.get('type') == 'AWS::S3::Bucket' and resource.get('ARN'):
                arn = resource.get('ARN')
                bucket_match = re.search(r'arn:aws:s3:::([^/]+)', arn)
                if bucket_match:
                    return bucket_match.group(1)
    
    # Check in requestParameters
    if 'requestParameters' in event['detail']:
        req_params = event['detail']['requestParameters']
        if req_params and 'bucketName' in req_params:
            return req_params['bucketName']
    
    # Check in responseElements
    if 'responseElements' in event['detail'] and event['detail']['responseElements']:
        resp_elements = event['detail']['responseElements']
        # Various response element formats
        if 'x-amz-bucket-region' in resp_elements:
            bucket_match = re.search(r'arn:aws:s3:::([^/]+)', 
                                    str(resp_elements))
            if bucket_match:
                return bucket_match.group(1)
    
    return None

def get_bucket_info(bucket_name, session):
    """
    Get comprehensive information about an S3 bucket using the provided session
    """
    s3_client = session.client('s3')
    sts_client = session.client('sts')
    
    # Get the account ID from the assumed role session
    account_id = sts_client.get_caller_identity().get('Account')
    
    result = {
        'bucket_name': bucket_name,
        'arn': f'arn:aws:s3:::{bucket_name}',
        'account_id': account_id,
        'tags': None,
        'has_bucket_policy': False,
        'bucket_policy': None,
        'acls_enabled': True,  # Default to True, will check if acl is "private"
        'acls': None,
        'block_public_access_bucket': None
    }
    
    # Get bucket tags
    try:
        tags_response = s3_client.get_bucket_tagging(Bucket=bucket_name)
        result['tags'] = tags_response.get('TagSet', [])
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchTagSet':
            result['tags'] = []
        else:
            logger.warning(f"Error getting bucket tags: {str(e)}")
    
    # Get bucket policy
    try:
        policy_response = s3_client.get_bucket_policy(Bucket=bucket_name)
        result['has_bucket_policy'] = True
        result['bucket_policy'] = json.loads(policy_response.get('Policy', '{}'))
    except ClientError as e:
        if e.response['Error']['Code'] == 'NoSuchBucketPolicy':
            result['has_bucket_policy'] = False
        else:
            logger.warning(f"Error getting bucket policy: {str(e)}")
    
    # Get bucket ACLs and determine if ACLs are enabled
    try:
        acl_response = s3_client.get_bucket_acl(Bucket=bucket_name)
        result['acls'] = acl_response.get('Grants', [])
        
        # Check bucket ownership controls to determine if ACLs are enabled
        try:
            ownership = s3_client.get_bucket_ownership_controls(Bucket=bucket_name)
            rules = ownership.get('OwnershipControls', {}).get('Rules', [])
            for rule in rules:
                if rule.get('ObjectOwnership') == 'BucketOwnerEnforced':
                    result['acls_enabled'] = False
                    break
        except ClientError as e:
            if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
                logger.warning(f"Error getting bucket ownership: {str(e)}")
    except ClientError as e:
        logger.warning(f"Error getting bucket ACL: {str(e)}")
    
    # Get block public access settings for bucket
    try:
        bpa_response = s3_client.get_public_access_block(Bucket=bucket_name)
        result['block_public_access_bucket'] = bpa_response.get('PublicAccessBlockConfiguration', {})
    except ClientError as e:
        if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration':
            logger.warning(f"Error getting bucket public access block: {str(e)}")
    
    return result

def remediate_bpa(session: boto3.Session, bucket_name: str) -> dict:
    """
    Enable S3 Block Public Access (BPA) for a specified S3 bucket.
    
    Args:
        session (boto3.Session): The boto3 session with assumed role credentials
        bucket_name (str): The name of the S3 bucket
        
    Returns:
        dict: Response from the API call
    """
    try:
        s3_client = session.client('s3')
        
        response = s3_client.put_public_access_block(
            Bucket=bucket_name,
            PublicAccessBlockConfiguration={
                'BlockPublicAcls': True,
                'IgnorePublicAcls': True,
                'BlockPublicPolicy': True,
                'RestrictPublicBuckets': True
            }
        )
        
        logger.info(f"Successfully enabled Block Public Access for bucket: {bucket_name}")
        return response
        
    except Exception as e:
        logger.error(f"Failed to enable Block Public Access for bucket {bucket_name}: {str(e)}")
        raise e

def remediate_acls(session: boto3.Session, bucket_name: str) -> dict:
    """
    Disable ACLs for a specified S3 bucket by setting ObjectOwnership to BucketOwnerEnforced.
    Only makes changes if ACLs are currently enabled.
    
    Args:
        session (boto3.Session): The boto3 session with assumed role credentials
        bucket_name (str): The name of the S3 bucket
        
    Returns:
        dict: Response from the API call or None if no change needed
    """
    try:
        s3_client = session.client('s3')
        
        # Check current ownership controls
        try:
            ownership = s3_client.get_bucket_ownership_controls(Bucket=bucket_name)
            rules = ownership.get('OwnershipControls', {}).get('Rules', [])
            for rule in rules:
                if rule.get('ObjectOwnership') == 'BucketOwnerEnforced':
                    logger.info(f"ACLs already disabled for bucket: {bucket_name}")
                    return None
        except ClientError as e:
            if e.response['Error']['Code'] != 'OwnershipControlsNotFoundError':
                raise
        
        # If we get here, either no ownership controls exist or they're not BucketOwnerEnforced
        response = s3_client.put_bucket_ownership_controls(
            Bucket=bucket_name,
            OwnershipControls={
                'Rules': [
                    {
                        'ObjectOwnership': 'BucketOwnerEnforced'
                    }
                ]
            }
        )
        
        logger.info(f"Successfully disabled ACLs for bucket: {bucket_name}")
        return response
        
    except Exception as e:
        logger.error(f"Failed to disable ACLs for bucket {bucket_name}: {str(e)}")
        raise e

def remediate_bucket_policy(session: boto3.Session, bucket_name: str, current_policy: dict) -> dict:
    """
    Check bucket policy size and add organization restriction for sensitive buckets.
    Allows AWS service principals when acting on behalf of the organization.
    Only updates policy if needed.
    
    Args:
        session (boto3.Session): The boto3 session with assumed role credentials
        bucket_name (str): The name of the S3 bucket
        current_policy (dict): The current bucket policy or empty dict
        
    Returns:
        dict: Response from the API call or None if no change needed
    """
    try:
        s3_client = session.client('s3')
        org_id = os.environ.get('org_id')
        
        if not org_id:
            raise ValueError("org_id environment variable is not set")
        
        # Create org deny statement with service principal exception
        org_deny_statement = {
            "Sid": "DenyAccessOutsideOrg",
            "Effect": "Deny",
            "Principal": "*",
            "Action": "s3:*",
            "Resource": [
                f"arn:aws:s3:::{bucket_name}",
                f"arn:aws:s3:::{bucket_name}/*"
            ],
            "Condition": {
                "StringNotEquals": {
                    "aws:PrincipalOrgID": org_id
                },
                "StringNotLike": {
                    "aws:PrincipalArn": [
                        "arn:aws:iam::*:role/aws-service-role/*",
                        "arn:aws:iam::*:role/service-role/*"
                    ]
                },
                "Bool": {
                    "aws:PrincipalIsAWSService": "false"
                }
            }
        }
        
        # Check if org restriction already exists
        org_restriction_exists = any(
            statement.get('Sid') == "DenyAccessOutsideOrg" and
            statement.get('Effect') == "Deny" and
            statement.get('Condition', {}).get('StringNotEquals', {}).get('aws:PrincipalOrgID') == org_id
            for statement in current_policy.get('Statement', [])
        )
        
        if org_restriction_exists:
            logger.info(f"Organization restriction already exists in bucket policy for: {bucket_name}")
            return None
            
        # If we get here, we need to add the org restriction
        # Ensure policy has basic structure
        if not current_policy:
            current_policy = {
                "Version": "2012-10-17",
                "Id": f"SecurityPolicy-{bucket_name}",
                "Statement": [org_deny_statement]
            }
        else:
            # Ensure policy has correct version and ID
            current_policy['Version'] = "2012-10-17"
            if 'Id' not in current_policy:
                current_policy['Id'] = f"SecurityPolicy-{bucket_name}"
            if 'Statement' not in current_policy:
                current_policy['Statement'] = []
            current_policy['Statement'].append(org_deny_statement)
        
        # Check policy size before applying
        policy_size = len(json.dumps(current_policy))
        if policy_size > 19456:  # 19K in bytes
            logger.error(f"Bucket policy for {bucket_name} is too large ({policy_size} bytes). Cannot add org restriction.")
            return None
            
        # Apply updated policy
        response = s3_client.put_bucket_policy(
            Bucket=bucket_name,
            Policy=json.dumps(current_policy)
        )
        
        logger.info(f"Successfully added organization restriction to bucket policy for: {bucket_name}")
        return response
        
    except Exception as e:
        logger.error(f"Failed to update bucket policy for bucket {bucket_name}: {str(e)}")
        raise e

def lambda_handler(event, context):
    """
    Main Lambda handler function
    """
    try:
        logger.info(f"Processing event: {json.dumps(event)}")
        
        # Extract account ID from the event
        if 'account' in event:
            account_id = event['account']
        else:
            logger.error("Could not extract account ID from event")
            return {
                'statusCode': 400,
                'body': json.dumps('Could not extract account ID from event')
            }
        
        logger.info(f"Extracted account ID: {account_id}")
        
        # Assume role in the target account
        try:
            credentials = assume_role(account_id)
            session = boto3.Session(**credentials)
        except Exception as e:
            logger.error(f"Failed to create cross-account session: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Failed to create cross-account session: {str(e)}')
            }
        
        # For EventBridge events, format is slightly different
        if 'detail' in event:
            # Extract bucket name from CloudTrail event
            bucket_name = extract_bucket_name(event)
        else:
            # Direct S3 event format
            bucket_name = extract_bucket_name({'detail': event})
        
        if not bucket_name:
            logger.error("Could not extract bucket name from event")
            return {
                'statusCode': 400,
                'body': json.dumps('Could not extract bucket name from event')
            }
        
        logger.info(f"Extracted bucket name: {bucket_name}")
        
        # Get information about the bucket using the cross-account session
        bucket_info = get_bucket_info(bucket_name, session)
        logger.info(f"Results: {bucket_info}")
        
        # Check if bucket has sensitive classification tag
        is_sensitive = any(
            tag.get('Key') == 'classification' and tag.get('Value') == 'sensitive'
            for tag in bucket_info.get('tags', [])
        )
        
        if is_sensitive:
            logger.info(f"Bucket {bucket_name} is tagged as sensitive")
            
            # Check if BPA is already fully enabled
            current_bpa = bucket_info.get('block_public_access_bucket', {})
            bpa_fully_enabled = all([
                current_bpa.get('BlockPublicAcls', False),
                current_bpa.get('IgnorePublicAcls', False),
                current_bpa.get('BlockPublicPolicy', False),
                current_bpa.get('RestrictPublicBuckets', False)
            ])
            
            if not bpa_fully_enabled:
                logger.info(f"Enabling Block Public Access for sensitive bucket {bucket_name}")
                remediation_response = remediate_bpa(session, bucket_name)
                bucket_info['remediation_response'] = remediation_response
            else:
                logger.info(f"Block Public Access already fully enabled for bucket {bucket_name}")
                
            # Check if ACLs are enabled and disable them if they are
            if bucket_info.get('acls_enabled', True):
                logger.info(f"Disabling ACLs for sensitive bucket {bucket_name}")
                acl_response = remediate_acls(session, bucket_name)
                bucket_info['acl_remediation_response'] = acl_response
            else:
                logger.info(f"ACLs already disabled for bucket {bucket_name}")
            
            # Add or update bucket policy with org restrictions
            current_policy = bucket_info.get('bucket_policy', {}) if bucket_info.get('has_bucket_policy') else {}
            policy_response = remediate_bucket_policy(session, bucket_name, current_policy)
            if policy_response:
                bucket_info['policy_remediation_response'] = policy_response
        else:
            logger.info(f"Bucket {bucket_name} is not tagged as sensitive")
            
        return {
            'statusCode': 200,
            'body': json.dumps(bucket_info, default=str)
        }
        
    except Exception as e:
        logger.error(f"Error processing event: {str(e)}")
        return {
            'statusCode': 500,
            'body': json.dumps(f'Error: {str(e)}')
        }

The code is pretty well commented. And yes, I used AI for a lot of it (I haz a day job, ya know!). To follow the logic, start in lambda_handler; you can see the flow and how we extract the account ID and the bucket name, then assumerole into the account, then pull the bucket data, then make any remediations if the tag is present.

You can hit Test now if you want, but even if your bucket is tagged sensitive, not all the remediations will work yet, because we need to fix permissions.

Let’s go fix those permissions! We’ll deploy a new StackSet with an updated template.

CloudFormation > StackSets > Service-managed > AutoremediationRole:

The first thing we need is the OU ID where the stack is deployed. Copy the OU ID and paste it into a text editor (we’ll also need to copy the URL to the updated template in a second):

Then Actions > Edit StackSet details:

Select Replace current template, then under Specify template, paste in this URL: https://cloudslaw.s3-us-west-2.amazonaws.com/lab56.template, then click Next:

Click Next on the next 2 screens, then Paste in the OU and select All regions (or just US East (Virginia)). Then Next > Submit:

It took me about 3 minutes for the template to update. As a reminder, this new template is the same as the previous two, but with the new permissions added and some unneeded ones removed.

Now it’s time to test things out. We’ll do this in the Production1 account, so we need to switch over to that.

Start in your Sign-in portal > CloudSLAW > AdministratorAccess. We need to go back to our Production1 account and change a tag on our bucket to a sample event to work with. (If you still have one in your email you can use that and skip this part). If you still have your account in your Role history, then just click it. If not here are the screenshots to switch roles (which is just assumerole from your AdministratorAccess role into the OrganizationAccountAccessRole in that account… yep, a simple role chain!).

That was just a reminder in case you didn’t have the account in your Role history.

We need to make two changes to ensure we can test the remediations, then we’ll change the tag on the instance to trigger the autoremediation and see it in action. I’m going to describe what to do and then show key screenshots — this should be easy to follow:

  • Disable block public access

  • Change Object Ownership to ACLs enabled

Time to test! Go to Properties > Tags > Edit > add a tag with the key “classification” and value “sensitive”:

Then go back to Permissions and refresh the page in about 15 seconds. You should now see that BPA is enabled, there’s a restrictive bucket policy, and ACLs are disabled!!!

I’m sorry, this is just really freaking cool! You have just implemented an advanced, reusable pattern for security autoremediation which operates from a central account!

Lab Key Points

  • This is cool

  • I write too much sometimes

-Rich

Reply

or to participate.