Amazon VPC Flow logs are logs that can capture network traffic information. They can be set at the VPC level, subnet level, or network interface level.
With the help of VPC flow logs, we get clear visibility into which destinations we tried to access and by which port. Additionally, we can see if this request was rejected or allowed. There are plenty of AWS tools (Guard Duty, Security Lake) that provide insights based on VPC flow logs. This blog doesn't have any intention to replace them.
I will try to show how you can get relevant insights by directly querying the data.
We will discuss several use-cases:
1. Get the insight about which source and how many times attempted to access my EC2 instance.
2. Understand if Security Group was modified
3. Find if someone attached new Security Group
4. Unused Security Groups
5. An attempt to find unused Security Group Rules
For this first of all I enabled the VPC flow log and the subnet level. This will be the subnet that will host my EC2 instances. You can do it in VPC service console in the subnet section
Flow logs will be located in the S3 bucket.
If you check the fields that are stored in the VPC flow logs, you will notice that there is limited information about which EC2 was used as a destination or which security group (you can treat the security group as the firewall) was processed during the request. The only thing that is available is an instance ID. It may be problematic since if you terminate the instance and start a new one, the instance ID will be changed. The same is correct for the network interface ID.
So consider that you have a fleet of EC2 Spot instances that are used in your EKS workload. They are revoked all the time, and the scaler is constantly creating new instances. So if you want to focus your investigation on this workload, it is very problematic. You need to know which records in the flow log are related to this fleet. As I mentioned, interface_id and instance_id are constantly changing.
The way to understand the relationship is by using the same tag when starting the instances. For example, you have the workload that processes orders, and you have several pods that are responsible for the payment process. It can depend on the load, whether you have 2 pods or 10 pods, but they are all responsible for the payment.
To relate them all together, always start them with the fleet of instances that are uniquely identified by the same name. It can be "payment" or, as I chose for this blog, "TestVPCLogs."
But how exactly do I connect the metadata of my instance to the flow log? For this, I will use another logging service by AWS called CloudTrail. This service is responsible for logging all APIs that are performed on your AWS account. To enable CloudTrail, get the CloudTrail service console and create a new trail that sends the information to S3.
Each time you start a new instance, the information will be stored in S3.
The information looks like this:
It is JSON data, and the VPC Flow logs data is single-line textual data. So how do you query the data and correlate between them?
I will use the Athena service, which can treat S3 objects as database sources in different formats. For this, we first need to define the tables that will treat S3 as a database and objects as table records.
There are two ways to do it.
First, use the Amazon Glue service, which can scan the location in S3 that you point to and create a table out of the data he finds. We will use it to create the flow log table.
Create and run the crawler and the table will be created automatically. But you will have to run crawler again to populate the new data if new partition will be added to the table.
Second, you can also add the table manually. Just go to the Athena service, choose the database where you want to populate the table, and run the following DDL statement:
CREATE EXTERNAL TABLE cloudtrail_logs_security_investigation ( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, username: STRING>, ec2RoleDelivery: STRING, webIdFederationData: MAP<STRING,STRING>>>, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestParameters STRING, responseElements STRING, additionalEventData STRING, requestId STRING, eventId STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, readOnly STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcEndpointId STRING, tlsDetails STRUCT< tlsVersion: STRING, cipherSuite: STRING, clientProvidedHostHeader: STRING> ) COMMENT 'CloudTrail table for security-investigation bucket' ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://security-investigation-xxxxxxx/AWSLogs/621011111111/CloudTrail/' TBLPROPERTIES ('classification'='cloudtrail');
After running Amazon Glue and manually executing the create table statement, you should have the following in your Athena.
I named my EC2 instace "TestVPCLogs" and I will use it in the query for filtering.
Also the event that we want to catch is "RunInstances" event.
All the events are stored in CloudTrail as JSON, so first of all, we need to convert JSON to a rational structure.
For this you need to get yourself familier with Athena JSON and Arrays support.
Our first investigation will focus on port 22. We want to understand who and hoe many times tried
to access our instance by using port 22.
In the query bellow we join CloudTrail data with VPC flow logs data.
with instances_details as ( select eventname, cast(element_at(instances, 'instanceId') AS VARCHAR) instance_id, json_extract_scalar(cast(element_at(instances, 'networkInterfaceSet') AS JSON),'$.items[0].networkInterfaceId') interface_id, json_extract_scalar(cast(element_at(instances, 'tagSet') AS JSON),'$.items[0].value') instance_name, json_extract_scalar(cast(element_at(instances, 'networkInterfaceSet') AS JSON),'$.items[0].groupSet.items[0].groupId') sg_name from cloudtrail_logs_security_investigation r, unnest( CAST( json_extract(r.responseelements, '$.instancesSet.items') as ARRAY(MAP(VARCHAR, JSON)) ) ) AS t(instances) where 1=1 and eventsource= 'ec2.amazonaws.com' and eventname like 'RunInstances' ) select instance_id, instances_details.interface_id, srcaddr, count(*) ssh_access_attempt, sg_name from instances_details left outer join vpc_flow_logseu_central_1 vpc_logs on instances_details.interface_id = vpc_logs.interface_id where instances_details.instance_name = 'TestVPCLogs' and dstport=22 group by instance_id, instances_details.interface_id, srcaddr, sg_name
We want to understand if the security group was modified. Why? We can monitor if someone
adds a new port, changes the source client to 0.0.0.0, or is part of a well-known blocked list.
select from_iso8601_timestamp(eventtime), cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.SecurityGroupRule.SecurityGroupRule.CidrIpv4') as varchar) AS new_ip_range, cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.SecurityGroupRule.SecurityGroupRule.ToPort') as varchar) AS new_port_range, cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.SecurityGroupRule.SecurityGroupRule.IpProtocol') as varchar) AS new_protocol, cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.GroupId') as varchar) AS sg_name from cloudtrail_logs_security_investigation where eventname ='ModifySecurityGroupRules' order by 1 desc
Find out if someone has attached a new security group.
select from_iso8601_timestamp(eventtime), cast(json_extract(requestparameters, '$.networkInterfaceId') as varchar) AS interface_id, cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS new_security_group from cloudtrail_logs_security_investigation where 1=1 and eventname = 'ModifyNetworkInterfaceAttribute' and eventsource='ec2.amazonaws.com' order by 1 desc
Get all the security groups that were defined but never used. How do we get them? We search for SG that has rules but has never been attached
to any network interface.
select sg_group_id from ( select from_iso8601_timestamp(eventtime), cast(json_extract(requestparameters, '$.groupId') as varchar) AS sg_group_id, current_date,current_time, eventname, * from cloudtrail_logs_security_investigation where 1=1 and eventname ='AuthorizeSecurityGroupIngress' and eventsource= 'ec2.amazonaws.com' ) cl where 1=1 and not exists ( select sg_group_id from ( select cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id from cloudtrail_logs_security_investigation where 1=1 and eventname ='CreateNetworkInterface' and eventsource= 'ec2.amazonaws.com' union select cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id from cloudtrail_logs_security_investigation where 1=1 and eventname = 'ModifyNetworkInterfaceAttribute' and eventsource='ec2.amazonaws.com' ) cl2 where cl.sg_group_id = cl2.sg_group_id and cl2.sg_group_id<>'' )
Next, we will try to search for the opposite. We will try to search for SGs that have some rules, and the rules will be used by some network interface.
The purpose is to find not-used rules that are actually garbage or that someone was able to inject but was not yet able to use them.
To make it more friendly and not overwhelm the SQL, I created it as a view.
create or replace view sg_rules_and_eni_v as select ace.sg_group_id, ace.interface_id, all_sg_rules.from_port, all_sg_rules.to_port, sg_group_rule_id from ( select cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id, cast(json_extract(requestparameters, '$.networkInterfaceId') as varchar) interface_id from cloudtrail_logs_security_investigation where 1=1 and eventname ='CreateNetworkInterface' and eventsource= 'ec2.amazonaws.com' union select cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id, cast(json_extract(requestparameters, '$.networkInterfaceId') as varchar) interface_id from cloudtrail_logs_security_investigation where 1=1 and eventname = 'ModifyNetworkInterfaceAttribute' and eventsource='ec2.amazonaws.com' ) ace join ( select cast(element_at(sg_rules, 'securityGroupRuleId') AS VARCHAR) sg_group_rule_id, cast(element_at(sg_rules, 'groupId') AS VARCHAR) sg_group_id, cast(element_at(sg_rules, 'fromPort') AS INT) from_port, cast(element_at(sg_rules, 'toPort') AS INT) to_port from cloudtrail_logs_security_investigation r, unnest( CAST( json_extract(r.responseelements, '$.securityGroupRuleSet.items') as ARRAY(MAP(VARCHAR, JSON)) ) ) AS t(sg_rules) where 1=1 and eventsource= 'ec2.amazonaws.com' and eventname = 'AuthorizeSecurityGroupIngress' ) all_sg_rules on ace.sg_group_id = all_sg_rules.sg_group_id where ace.sg_group_id<>'' and ace.interface_id<>''
Once I had this information, I "left joined" this data with VPC flow logs. The intuition says that if I define the rule on port 3 but it doesn't appear in teh flow log records, it belongs to the rule that was never used.
At this point I found that once you define the security group rule, even if you don't run any traffic implicitly, you still have some records in the vpc flow log related to the ports in SG rule which makes it hard to understand SG rule utilization.
I am not sure why do we have this traffic. In the meanwhile I tried to eliminate this "noice" by using heuristic. For example I filtered all the records that originated from 167. subnet since it is some internal AWS address. I also can filter it by the bytesizes however I am not sure how much is it reliable.
select * from sg_rules_and_eni_v srv left outer join (select distinct interface_id, dstport, srcaddr, dstaddr from vpc_flow_logseu_central_1 where srcaddr not like '167%' and srcaddr not like '162%' ) vfl on srv.interface_id=vfl.interface_idand srv.from_port>= cast(vfl.dstport as int) and srv.to_port<=cast(vfl.dstport as int) where vfl.dstaddr='10.0.13.213' -- My EC2 instance
No comments:
Post a Comment