Thursday 28 December 2023

Low level investigation of AWS VPC Flow logs and CloudTrail for Security Groups insight

 Amazon VPC Flow logs are logs that can capture network traffic information. They can be set at the VPC level, subnet level, or network interface level.

With the help of VPC flow logs, we get clear visibility into which destinations we tried to access and by which port. Additionally, we can see if this request was rejected or allowed. There are plenty of AWS tools (Guard Duty, Security Lake) that provide insights based on VPC flow logs. This blog doesn't have any intention to replace them.

I will try to show how you can get relevant insights by directly querying the data.

We will discuss several use-cases:

1. Get the insight about which source and how many times attempted to access my EC2 instance.

2. Understand if Security Group was modified

3.  Find if someone attached new Security Group

4. Unused Security Groups

5. An attempt to find unused Security Group Rules

For this first of all I enabled the VPC flow log and the subnet level. This will be the subnet that will host my EC2 instances. You can do it in VPC service console in the subnet section


Flow logs will be located in the S3 bucket.

If you check the fields that are stored in the VPC flow logs, you will notice that there is limited information about which EC2 was used as a destination or which security group (you can treat the security group as the firewall) was processed during the request. The only thing that is available is an instance ID. It may be problematic since if you terminate the instance and start a new one, the instance ID will be changed. The same is correct for the network interface ID.

So consider that you have a fleet of EC2 Spot instances that are used in your EKS workload. They are revoked all the time, and the scaler is constantly creating new instances. So if you want to focus your investigation on this workload, it is very problematic. You need to know which records in the flow log are related to this fleet. As I mentioned, interface_id and instance_id are constantly changing.

The way to understand the relationship is by using the same tag when starting the instances. For example, you have the workload that processes orders, and you have several pods that are responsible for the payment process. It can depend on the load, whether you have 2 pods or 10 pods, but they are all responsible for the payment.

To relate them all together, always start them with the fleet of instances that are uniquely identified by the same name. It can be "payment" or, as I chose for this blog, "TestVPCLogs."

But how exactly do I connect the metadata of my instance to the flow log? For this, I will use another logging service by AWS called CloudTrail. This service is responsible for logging all APIs that are performed on your AWS account. To enable CloudTrail, get the CloudTrail service console and create a new trail that sends the information to S3.  


Each time you start a new instance, the information will be stored in S3.

The information looks like this:


It is JSON data, and the VPC Flow logs data is single-line textual data. So how do you query the data and correlate between them?

I will use the Athena service, which can treat S3 objects as database sources in different formats. For this, we first need to define the tables that will treat S3 as a database and objects as table records.

There are two ways to do it.

First, use the Amazon Glue service, which can scan the location in S3 that you point to and create a table out of the data he finds. We will use it to create the flow log table.


Create and run the crawler and the table will be created automatically. But you will have to run crawler again to populate the new data if new partition will be added to the table.

Second, you can also add the table manually. Just go to the Athena service, choose the database where you want to populate the table, and run the following DDL statement:

CREATE EXTERNAL TABLE cloudtrail_logs_security_investigation (
    eventVersion STRING,
    userIdentity STRUCT<
        type: STRING,
        principalId: STRING,
        arn: STRING,
        accountId: STRING,
        invokedBy: STRING,
        accessKeyId: STRING,
        userName: STRING,
        sessionContext: STRUCT<
            attributes: STRUCT<
                mfaAuthenticated: STRING,
                creationDate: STRING>,
            sessionIssuer: STRUCT<
                type: STRING,
                principalId: STRING,
                arn: STRING,
                accountId: STRING,
                username: STRING>,
            ec2RoleDelivery: STRING,
            webIdFederationData: MAP<STRING,STRING>>>,
    eventTime STRING,
    eventSource STRING,
    eventName STRING,
    awsRegion STRING,
    sourceIpAddress STRING,
    userAgent STRING,
    errorCode STRING,
    errorMessage STRING,
    requestParameters STRING,
    responseElements STRING,
    additionalEventData STRING,
    requestId STRING,
    eventId STRING,
    resources ARRAY<STRUCT<
        arn: STRING,
        accountId: STRING,
        type: STRING>>,
    eventType STRING,
    apiVersion STRING,
    readOnly STRING,
    recipientAccountId STRING,
    serviceEventDetails STRING,
    sharedEventID STRING,
    vpcEndpointId STRING,
    tlsDetails STRUCT<
        tlsVersion: STRING,
        cipherSuite: STRING,
        clientProvidedHostHeader: STRING>
)
COMMENT 'CloudTrail table for security-investigation bucket'
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://security-investigation-xxxxxxx/AWSLogs/621011111111/CloudTrail/'
TBLPROPERTIES ('classification'='cloudtrail');




After running Amazon Glue and manually executing the create table statement, you should have the following in your Athena.



 

I named my EC2 instace "TestVPCLogs" and I will use it in the query for filtering.
Also the event that we want to catch is "RunInstances" event.

All the events are stored in CloudTrail as JSON, so first of all, we need to convert JSON to a rational structure.

For this you need to get yourself familier with Athena JSON and Arrays support.

Our first investigation will focus on port 22. We want to understand who and hoe many times tried

to access our instance by using port 22.

In the query bellow we join CloudTrail data with VPC flow logs data.

with instances_details as (
    select eventname, cast(element_at(instances, 'instanceId') AS VARCHAR) instance_id,
               json_extract_scalar(cast(element_at(instances, 'networkInterfaceSet') AS JSON),'$.items[0].networkInterfaceId') interface_id,
               json_extract_scalar(cast(element_at(instances, 'tagSet') AS JSON),'$.items[0].value') instance_name,
               json_extract_scalar(cast(element_at(instances, 'networkInterfaceSet') AS JSON),'$.items[0].groupSet.items[0].groupId') sg_name
        from cloudtrail_logs_security_investigation r,
             unnest(
                   CAST(
                           json_extract(r.responseelements, '$.instancesSet.items') as ARRAY(MAP(VARCHAR, JSON))
                       ) 
                    ) AS t(instances) 
        where  1=1 
        and eventsource= 'ec2.amazonaws.com'
        and eventname like 'RunInstances'
)
select instance_id, 
       instances_details.interface_id, 
       srcaddr, 
       count(*) ssh_access_attempt,
       sg_name
from instances_details left outer join 
      vpc_flow_logseu_central_1 vpc_logs on instances_details.interface_id =   vpc_logs.interface_id
where     instances_details.instance_name = 'TestVPCLogs'
and dstport=22
group by instance_id, 
         instances_details.interface_id, 
         srcaddr,
         sg_name





We want to understand if the security group was modified. Why? We can monitor if someone

adds a new port, changes the source client to 0.0.0.0, or is part of a well-known blocked list.


select from_iso8601_timestamp(eventtime),
       cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.SecurityGroupRule.SecurityGroupRule.CidrIpv4') as varchar) AS new_ip_range,
       cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.SecurityGroupRule.SecurityGroupRule.ToPort') as varchar) AS new_port_range,
       cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.SecurityGroupRule.SecurityGroupRule.IpProtocol') as varchar) AS new_protocol,
       cast(json_extract(requestparameters, '$.ModifySecurityGroupRulesRequest.GroupId')  as varchar) AS sg_name
from cloudtrail_logs_security_investigation
where eventname ='ModifySecurityGroupRules'
order by 1 desc



Find out if someone has attached a new security group.


select from_iso8601_timestamp(eventtime),
cast(json_extract(requestparameters, '$.networkInterfaceId') as varchar) AS interface_id,
cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS new_security_group
from cloudtrail_logs_security_investigation
where 1=1
and eventname = 'ModifyNetworkInterfaceAttribute'
and eventsource='ec2.amazonaws.com'
order by 1 desc



Get all the security groups that were defined but never used. How do we get them? We search for SG that has rules but has never been attached

to any network interface.

select sg_group_id
from
(
select from_iso8601_timestamp(eventtime),
cast(json_extract(requestparameters, '$.groupId') as varchar) AS sg_group_id,
       current_date,current_time,
       eventname,
       *
from cloudtrail_logs_security_investigation
where 1=1
and eventname ='AuthorizeSecurityGroupIngress'
and eventsource= 'ec2.amazonaws.com'
) cl
where 1=1
and not exists (
                 select sg_group_id from
                 (
                    select cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id
                    from cloudtrail_logs_security_investigation 
                    where 1=1
                    and eventname ='CreateNetworkInterface'
                    and eventsource= 'ec2.amazonaws.com'
                    union
                    select cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id
                    from cloudtrail_logs_security_investigation
                    where 1=1
                    and eventname = 'ModifyNetworkInterfaceAttribute'
                    and eventsource='ec2.amazonaws.com'
                 ) cl2
                 where cl.sg_group_id = cl2.sg_group_id
                 and cl2.sg_group_id<>''
               )



Next, we will try to search for the opposite. We will try to search for SGs that have some rules, and the rules will be used by some network interface.

The purpose is to find not-used rules that are actually garbage or that someone was able to inject but was not yet able to use them.

To make it more friendly and not overwhelm the SQL, I created it as a view.

 create or replace view sg_rules_and_eni_v as
 select ace.sg_group_id, 
        ace.interface_id,
        all_sg_rules.from_port, 
        all_sg_rules.to_port,
        sg_group_rule_id
 from
 (        select      cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id,
                    cast(json_extract(requestparameters, '$.networkInterfaceId') as varchar) interface_id
                    from cloudtrail_logs_security_investigation
                    where 1=1
                    and eventname ='CreateNetworkInterface'
                    and eventsource= 'ec2.amazonaws.com'
                    union
                    select cast(json_extract(requestparameters, '$.groupSet.items[0].groupId') as varchar) AS sg_group_id,
                           cast(json_extract(requestparameters, '$.networkInterfaceId') as varchar) interface_id
                    from cloudtrail_logs_security_investigation
                    where 1=1
                    and eventname = 'ModifyNetworkInterfaceAttribute'
                    and eventsource='ec2.amazonaws.com'
        ) ace join 
        (
         select cast(element_at(sg_rules, 'securityGroupRuleId') AS VARCHAR) sg_group_rule_id,
            cast(element_at(sg_rules, 'groupId') AS VARCHAR) sg_group_id,
            cast(element_at(sg_rules, 'fromPort') AS INT) from_port,
            cast(element_at(sg_rules, 'toPort') AS INT) to_port
         from cloudtrail_logs_security_investigation r,
             unnest(
                   CAST(
                           json_extract(r.responseelements, '$.securityGroupRuleSet.items') as ARRAY(MAP(VARCHAR, JSON))
                       ) 
                    ) AS t(sg_rules) 
        where  1=1 
        and eventsource= 'ec2.amazonaws.com'
        and eventname = 'AuthorizeSecurityGroupIngress'
        ) all_sg_rules  on ace.sg_group_id = all_sg_rules.sg_group_id
        
        where ace.sg_group_id<>''
       and ace.interface_id<>''

Once I had this information, I "left joined" this data with VPC flow logs. The intuition says that if I define the rule on port 3 but it doesn't appear in teh flow log records, it belongs to the rule that was never used. 

At this point I found that once you define the security group rule, even if you don't run any traffic implicitly, you still have some records in the vpc flow log related to the ports in SG rule which makes it hard to understand SG rule utilization.

I am not sure why do we have this traffic. In the meanwhile I tried to eliminate this "noice" by using heuristic. For example I filtered all the records that originated from 167. subnet since it is some internal AWS address. I also can filter it by the bytesizes however I am not sure how much is it reliable.


    select *
        from sg_rules_and_eni_v  srv 
        left outer join (select distinct interface_id, 
                                         dstport,
                                         srcaddr,
                                         dstaddr
                         from vpc_flow_logseu_central_1
                         where srcaddr not like '167%'   
                         and srcaddr not like '162%'  ) vfl on srv.interface_id=vfl.interface_id 
     and srv.from_port>=   cast(vfl.dstport as int) and srv.to_port<=cast(vfl.dstport as int)
                         where  vfl.dstaddr='10.0.13.213' -- My EC2 instance





Sunday 15 October 2023

S3 cost estimation for multi-tenant bucket - storage

 Having a single S3 bucket to hold assets from multiple tenants is a common practice for many organizations. Each tenant has a dedicated prefix in the bucket, and here the story gets complicated.

In AWS, you can know the cost of the bucket but not of the single prefix. So how can you estimate the cost? My blog, which was posted on AWS, explains the steps that you can take to estimate the cost.

Follow it here.

I am working on the same concept for API calls and will post it as soon as I finish.

Tuesday 3 October 2023

Using Mongoose with Amazon Document DB

 We live in 2023, and unless it is absolutely necessary, no one calls the DB layer directly. We use the concept of ORM for every modern language and database.

The same is correct for MondoDB, popular document database. There is a project called "Mongoose" that creates the abstract layer above the database. The technology has been there for some time, and we know that it works fine.

But what about Amazon Document DB? If you are new to Document DB, you should know that this is Amazon proprietary technology, which doesn't run the Mongo engine but is based on the technology and concepts developed by Amazon in-house. DocumentDB emulates the Mongo API, and basically each tool and driver that you can use with MongoDB you can also use with DocumentDB. There are still some differences, but all major features are supported.

In this post, we will check if Mongoose can work with DocumentDB.

The code and instructions are based on two other documents:

1. Getting started with Mongoose and MongoDB

2.  Connect to DocumentDB from EC2.

I started by creating the DocumentDB database according to link 2 above.

My cluster looks like this:


It is very important to make sure that the database security group has port 27017 set in the security group inbound rule.




The DocumentDB is deployed into VPC, and there is no public access. It doesn't mean that it cannot be accessed by the internet, but the easiest way to access it is by creating EC2 in the same VPC. I created the one that runs Amazon Linux 3.



You need to make sure that EC2 has the role that allows access to DocumentDB.




and the role looks like


Now we are ready to write the application.



I created a Node application according to Link 1 above.

Don't forget to modify the package.json file and add the module type



You also need global-bundle.pem file since we use TLS connection to DocumentDB.

You can get it by running:

wget https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem

The content of the files:

db.js (note that the connection string is masked)
========================



import mongoose from "mongoose";

export default function connectDB() {
  const url = "mongodb://xxxaws.com:27017/?tls=true&tlsCAFile=global-bundle.pem&retryWrites=false";

  try {
    mongoose.connect(url, {
      useNewUrlParser: true,
      useUnifiedTopology: true,
      maxPoolSize: 30
    });
  } catch (err) {
    console.error(err.message);
    process.exit(1);
  }
  const dbConnection = mongoose.connection;
  dbConnection.once("open", (_) => {
    console.log(`Database connected: ${url}`);
  });

  dbConnection.on("error", (err) => {
    console.error(`connection error: ${err}`);
  });
  return;
}

Blog.js
======================
import mongoose from 'mongoose';
const { Schema, model } = mongoose;

const blogSchema = new Schema({
  title: String,
  slug: String,
  published: Boolean,
  author: String,
  content: String,
  tags: [String],
  createdAt: Date,
  updatedAt: Date,
  comments: [{
    user: String,
    content: String,
    votes: Number
  }]
});

const Blog = model('Blog', blogSchema);
export default Blog;

index.js

=========================

import express from "express";
import connectDB from "./config/db.js";
import Blog from './model/Blog.js';

const app = express();
const PORT = 3000;

app.use(express.json());
app.use(express.urlencoded({ extended: true }));

connectDB();


// Create a new blog post object
const article = new Blog({
  title: 'Using mongoose with Amazon Document DB',
  slug: 'learnfrommike',
  published: true,
  content: 'Testing mongoose with Amazon Document DB',
  tags: ['Mongoose', 'Amazon DocumentDB'],
});

// Insert the article in our DocumentDB database
await article.save();

const firstArticle = await Blog.findOne({});
console.log(firstArticle);

// You can modify this function to return the Article object instead of
// printing it to the console
app.get("/", (request, response) => { response.send({ message: "Hello from an Express API!" }); }); app.listen(PORT, () => { console.log(`Server running at http://localhost:${PORT}`); });


If we did everything correctly, we should see the content of
"Article" object, being returned from DocumentDB.





The result is good. So Mongoose is compatible with DocumentDB.
Enjoy!