Monday, 7 August 2023

Amazon Aurora audit data with activity streams

 The ability to store the activity that is executed in your database is a must-have in the modern technology landscape.

First of all, there are different sets of regulations and compliance requirements.

In addition, it can be used for internal analytics, in order to understand the pattern of usage or detect anomalies.

Security tools for application detection and prevention can rely on this data to identify and prevent security attacks on your application.

Amazon Aurora provides the ability to store the audit with just one click. All you need to do is create the cluster and choose the "Start Activity Stream" option.

And this is exactly what I did.


I already clicked the button this is why you see the stop option.

Once you click to enable the stream, the data flows to the Kinesis Data Stream that was provisioned for you automatically. All the data is encrypted with the KMS key. It is up tp you to decide what you want to do with the data.

Amazon documentation suggest the following architecture:


Which streams the data to S3 or external application. I decided to modify the architecture a little bit.


The data in the activity stream is encrypted. And it is fine to production use-case. But for this blog I want to see the actual activity in S3. To achieve this, I will use the transformation Lambda of Kinesis Firehose and use it to decode the message before it is stored in S3.

Please find below the Lambda code:

import base64
import json
import zlib
import aws_encryption_sdk
from aws_encryption_sdk import CommitmentPolicy
from aws_encryption_sdk.internal.crypto import WrappingKey
from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider
from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType
import boto3

enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT)

class MyRawMasterKeyProvider(RawMasterKeyProvider):
    provider_id = "BC"

    def __new__(cls, *args, **kwargs):
        obj = super(RawMasterKeyProvider, cls).__new__(cls)
        return obj

    def __init__(self, plain_key):
        RawMasterKeyProvider.__init__(self)
        self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING,
                                        wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC)

    def _get_raw_key(self, key_id):
        return self.wrapping_key


def decrypt_payload(payload, data_key):
    my_key_provider = MyRawMasterKeyProvider(data_key)
    my_key_provider.add_master_key("DataKey")
    decrypted_plaintext, header = enc_client.decrypt(
        source=payload,
        materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider))
    return decrypted_plaintext


def decrypt_decompress(payload, key):
    decrypted = decrypt_payload(payload, key)
    return zlib.decompress(decrypted, zlib.MAX_WBITS + 16)

def lambda_handler(event, context):
    output = []
    session = boto3.session.Session()
    kms = session.client('kms')
   
    RESOURCE_ID = 'cluster-XXXXXXXXXXXXX'     
    for record in event['records']:
        
        recdord_data_plain = base64.b64decode(record['data']).decode('utf-8')
        
        record_data = json.loads(recdord_data_plain)
        payload_decoded = base64.b64decode(record_data['databaseActivityEvents'])
        data_key_decoded = base64.b64decode(record_data['key'])
        data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded,
                                              EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID})
        
        plaintext =  (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext']))
        
        events = json.loads(plaintext)
        
        ## Filtering logic. ## Removes heartbeat and rdsadmin events
        # if events['databaseActivityEventList']
        # if events['databaseActivityEventList'] is actually an empty array, then lines 67 and 68 wont happen and it will skip to 70 
        for dbEvent in events['databaseActivityEventList'][:]:
            if dbEvent['type'] == "heartbeat" or (dbEvent['dbUserName'] and dbEvent["dbUserName"] == "rdsadmin"):
                events['databaseActivityEventList'].remove(dbEvent)
        
        result = 'ProcessingFailed'
        
        if events['databaseActivityEventList']: # This is the same as len(events['databaseActivityEventList']) != 0 since an empty array is "FALSEY" in python
            result = 'Ok'
            print('Decrypted data bellow')
            print(json.dumps(events))        
   
        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': result,
            'data': base64.b64encode(json.dumps(events).encode('utf-8')).decode('utf-8')
        }
        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}


The code is based on Amazon workshop for activity stream, I just modified the code to run inside Lambda. Also, note that you need a security package to run the code, which I also took from the workshop and uploaded as a lambda layer. You have to use Python 3.9 at least.

Next, use you favorite database client to run DDL or DML statement:

End here is the result that I downloaded from S3 and opened with Json Editor (I masked some data for the security reasons)

{
"type": "DatabaseActivityMonitoringRecord",
"clusterId": "cluster-XXXXXXXXXXXX",
"instanceId": "db-3TLYJL7WCFXYMCAXXXXXXXXA",
"databaseActivityEventList": [
{
"logTime": "2023-08-07 19:22:14.534688+00",
"statementId": 2,
"substatementId": 1,
"objectType": null,
"command": "CREATE ROLE",
"objectName": null,
"databaseName": "postgres",
"dbUserName": "postgres",
"remoteHost": "172.31.43.XXXX",
"remotePort": "33468",
"sessionId": "64d14447.5a3b",
"rowCount": null,
"commandText": "CREATE USER davide WITH PASSWORD <REDACTED>",
"paramList": [],
"pid": 23099,
"clientApplication": "psql",
"exitCode": null,
"class": "ROLE",
"serverVersion": "14.7.1",
"serverType": "PostgreSQL",
"serviceName": "Amazon Aurora PostgreSQL-Compatible edition",
"serverHost": "172.31.XXXXXXX",
"netProtocol": "TCP",
"dbProtocol": "Postgres 3.0",
"type": "record",
"startTime": "2023-08-07 19:22:14.533149+00",
"errorMessage": null
}
]
}






You need to be aware that once you enable the stream options there is a little overhead for CPU consumption.