Tuesday, 29 March 2022

Overview of some of the AWS NoSql databases

 I do deep dive into AWS NoSQL solutions. I want to share my finding with you. 

I did the overview of the following databases

1. DynamoDB

2. DocumentDB

3. Keyspaces

4. OpenSearch


It is not a compare. Just basic overview with useful links 

Dynamo DB

  • AWS Managed – serverless with ACID support.
  • Can target up to 25 distinct items in one or more DynamoDB tables within the same AWS account and in the same Region in one transaction. The aggregate size of the items in the transaction cannot exceed 4 MB
  • Can create 256 table per account per region
  • Records= items. Each item can have a collection of attributes. Item size is limited to 400K (including attributes names). Sizing. There is no practical limit on a table's size
  • Use capacity units for throttling and pricing. For reads – RCU, for writes WCU. You can use up to 3,000 Read Capacity Units (RCUs) and up to 1,000 Write Capacity Units (WCUs) on a single partition per second. Note — this is a lot of capacity! This would allow you to read 12MB of strongly-consistent data or 24MB of eventually-consistent data per second, as well as to write 1MB of data per second
  • Scalable and highly available
  • Uses Streams for event-driven development and to log DB events. Emits events on table modification. Like triggers in RDS. Preserves modification order. Can invoke Lambda to send streams event for farther processing. (always use try catch since the lambda will retry all the time in case of exception). Each streams read request unit can return up to 1 MB of data.
  • Access control: Can control Full access to the table, Attribute access (column) or item access Access Control
  • Partition Key- defined on one attribute. Think about it as a bucket in S3. Partition key is hashed and replicated. Can contain non unique values
  • Partition Key+Sort: defined on 2 items. Think about sort key as a folders in the s3 bucket. Allow to specify conditions. The combination of primary and sort key should be unique.
  • Consistency: Writes  - you get an acknowledge from at least 2 replicas and write has eventual consistency. Reads – Two modes: default eventually consistent, you can also have strong consistency but it costs more. Strong consistency consumes one capacity unit. Eventually consistency consumes half capacity unit
  • Global Secondary indexes –there is a limit of 20 indexes, but can be requested to be increased. It actually creating new table with a new Partition Key (which is set on GSI attribute), but keeps the 2 tables in synch. Note that RCU/WCU usage are taken into consideration than using this index. It doubles the price of writing. Eventual consistency with GSI is less predicted
  • Local Secondary indexes – limit  of 5 per table. Similar to normal index in RDS. Can be created ONLY at creation time. You can project (add) other attributes to the index. Item collection with LSI is limited to 10GB LSI
  • GSI vs LSI
  • DynamoDB does not provide aggregation functions
  • Can query data in SQL style with Party. But write the query carefully and avoid full table scan. It will consume a lot of Capacity Units
  • DynamoDb has a caching layer called DAX. Additional charges apply since the procing model of DAX is based on the underlying infrastructure and not capacity units.
  • Two payment modes: Provisioned Capacity units: you set how much RCU and WCU do you want to use. It supports Auto Scaling (but it can take several minutes to scale the table). Use this for consistent or predictable traffic patterns. On-Demand: You only pay for what you use per request. Best for unpredictable or/and random traffic. Capacity Mode . Switching from provisioned to On-demand can take several minutes.
  • Global Tables: Cross-Region tables. Async replication between tables. In case of network problem the data is queued and eventually written to the table. No strong consistency. On simultaneous writes, the last operation wins.
  • Encryption at rest and at transit
  • Table Classes: Dynamo has 2 classes. One for standard workload and second for the load where the storage is dominant. With infrequent class, the storage is cheaper but the capacity units cost more
  • Some links: Quotas and Limits  Best Practices  Cost

Document DB



  • DocumentDB is compatible with MongoDB 3.6 and 4.0 drivers and tools with support of ACID transactions. However the underlying infrastructure (software) is completely different. There are a lot of differences. Migration from Mongo to DocumentDB is possible with DMS service.
  • Doesn’t support Mongo DB sharding.
  • Supports GUI tools like: Compass, Robo 3T, Studio 3T,mongoboster
  • Auto scaling with storage up to 64TB. Up to 15 Read-Replicas. Support global Cluster with up to 5 regions. Multi-AZ support. Can scale up to millions of read per second. Read Replicas share the same storage as Master in the same region. On failover read replica with the highest priority becomes the new master.
  • You pay for instances, IO for read/write, storage and backup storage
  • Encryption in rest and in transit. It is not possible to encrypt existing unencrypted cluster
  • Change Streams: capture the changes of data in the cluster (Similar to Streams in dynamoDB). Can be integrated with other AWS services with Lambda and written to other services (Elastic,S3). AWS Glue also supports DocumentDB. Use it with SPART ETL to read transform and load data into S3 and Redshift Athena is also supported for DocumentDB with Athena Connector.
  • Integration with IAM
  • Reads from DocumentDB are eventual consistant. Supports TTL, but exact time of deletion is not guaranteed.
  • Common use case: Write data to DocumentDB. Use Lambda to push the data to Elastic Search and use free text to search what you need.
  • How Mongo “sees” the differences:
  • Compare Mongo and DocumentDB External sources comparison
  • Index: when you create the collection the default index created on the “id” field. “Single Field Index”: can index on any field in the collection. You can specify the index to be ASC or DESC. When you create an index on non existing collection, the collection also created. Indexes can be also created on the nested fields. “Compound filter”: create index on several fields. “Multi Key Index”: allows to create an index on arrays to find an element in array. “Compound multi key”: combination of one “multi key” index with another non “non multi key” index. “Sparse Index”: define it is some field appears only in small portion of the data (have to specify $exists: true in the query). “TTL”: is actually also an index. Each deletion incur IO cost. Note!! Using indexes will involve more storage and IO usage and as a result more cost Blog about indexes
  • Links: Quota and Limits Best Practices

Keyspaces


  • Managed service compatible with Apache Cassandra. NoSQL database. Supports Cassandra Query Language. Tables are replicated 3 times in multiple AZ. Can recover to any point in time up to 35 days. No multi-region support.
  • Supports “On Demand” and “provisioned” mode with Auto Scaling. Auto Scaling may take several minutes. Supports TTL (doesn’t consumes read/write, but has a separate cost) capacity. No limitation for rows or table size.
  • Cost: You pay for read operations in chunks of 4K. LOCAL_QUORUM consistency costs twice vs LOCAL_ONE consistency. Write operations priced in chunks of 1K. There is also the payment for the storage and management operations like Restoring the table or TTL
  • Encryption in rest and in transit.
  • There is no strong consistency. You can demand the “read” be consistent in at least 2 nodes (LOCAL_QUORUM). Consistency
  • Static columns: there is an option to define static column. The value stored in this column is shared between all rows in a logical partition. When you update the value of this column, Amazon Keyspaces applies the change automatically to all rows in the partition.
  • Access Control: Only IAM. Can control access to keyspace or table.
  • Primary key. Can consist of one or multiple partition key columns, which determine which partitions the data is stored in, and one or more optional clustering column, which define how data is clustered and sorted within a partition
  • Indexes are not supported!!!
  • There is no full support for Cassandra operations. E.g. Aggregation functions are not supported , triggers are not supported. Full Compare
  • Links Quota and Limits Examples

Open Search



  • OpenSearch. A fork from Elastic search created by AWS after the license was changed. OpenSeach includes Kinaba that allows to managed the data and visualize it. Open search is not serverless, but fully managed. It means you need to choose the instances and storage. Common use-cases: log analytics, real-time application monitoring, website search
  • Supports Multi-AZ, but you can choose to run on the single AZ. Scaling doesn’t require downtime. Multi Region is not supported directly, but you can define 2 clusters in different regions and do “cross cluster” replication
  • No support for ACID transactions for multiple documents. More information.
  • Supports “On-Demand” and “Reserved” instances for cost saving
  • Can query with API (Rest) or OpenSearch SQL language. Supports Asynchronous search. Supports aggregations.
  • Encryption in rest and in transit.
  • Storage: Cold - retain infrequently accessed data in Amazon S3. "Hot" - hot tier is powered by data nodes which are used for indexing, updating, and providing the fastest access to data. With Cold storage you can detach the indices. "UltraWarm" – Compliment to hot storage allows for fast retrieval of frequently accessed read-only data. Supports up to 3 PB of primary data. The maximum storage per node is 15 TB with the i3.16xlarge instances.  You can allocate about 600 TB of storage to a single domain. This is a soft limit.
  • Access Control: can be controlled with IAM or Cognito (to grant access to Kibana). OpenSearch security plugin, enabling you to define granular permissions for indices, documents, or fields. You can also extend Kibana with read-only views and secure multi-tenant support.
  • Integration with Logstash, Kinesis Data Firehose.
  • Logs: Slow logs: Can be enabled/disabled. Can be set on specific index. Index Slow Logs – These logs provide insights into the indexing process and can be used to fine-tune the index setup. Search Slow Logs – These logs provide insights into how fast or slow queries and fetches are performing. These logs help fine tune the performance of any kind of search operation. Error logs are exposed for the entire domain. Each log entry made into CloudWatch will be limited to 255,000 characters
  • Index rollups in Amazon OpenSearch Service let you reduce storage costs by periodically rolling up old data into summarized indices.
  • Transform jobs let you create a different, summarized view of your data centered around certain fields, so you can visualize or analyze the data in different ways.
  • Consistency: Elasticsearch read consistency is eventually consistent. However there are some flags you can play with to make it strong consistent for the same client. Source.
  • Concepts: Data is stored as a JSON document (table in RDS).Examples for document: blogpost, comments, authors. Collection of the documents are called index. Shards – logically divide your data to pieces. Each piece can be stored on the same machine or multiple machines. Collection of the shards are called Cluster. Replicas – contain the same data as shard for high availability. Cluster divided to nodes. Each node contains several shards or replicas. Each time you add a node , overall shards will distribute themselves to all nodes evenly. Mappings – definition of the attributes and their types in json document. (similar to RDS columns definition)
  • Links: Best Practice Quota and Limits

All of the written above is correct only to the moment of writing this post. AWS investing a lot in improving it's services, so you may expect the things to change.

Thursday, 17 March 2022

My First Steps with AWS SageMaker - Part 1

 Let me start by saying that I had zero knowledge about ML before 1 week. I also didn't write a single line of Python code before I was exposed to ML. So if you see or think that I am doing something really stupide or writing some "noob" code - sorry,

So what I am going to do. 

I an going to predict the prices of the goods by using Linear Regression ML approach. For this I am going to use the following 


1. Dataset with basic feature engineering

2. Dataset with more advanced  feature engineering

3. Xgboost, Linear Learner and SageMaker autopilot


My Dataset looks like this 




It has the actual item. 
The branch number that the item belongs to. 
Item code (barcode).
Discount (if any)
Date fields. Since ML doesn't know what is date I engineered it to separate columns

The dataset has several hundred thousands records and I uploaded it to the AWS s3.
The rest of the code shows the Jupiter notebook that uses S3 as the source of the data, creating training, validation and test datasets, train the model by using Sagemaker build in Xgboost image and deploy as rest api

Important to note:
1. The file in s3 should not  contain headers
2. You don't need to specify the "label" column. The algorithm always assumes that it is the first column in the CSV file 

Note the comments in the body of the code.
Each block represents the code block in the Jupiter notebook

%%time

import os
import boto3
import re
import sagemaker

# Get a SageMaker-compatible role used by this Notebook Instance.
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

### update below values appropriately ###
bucket = sagemaker.Session().default_bucket()
prefix = "sagemaker/xgboost-no-fe"
####

print(region)

-----------------------------------------------------------------------------------
%%time

import io
import boto3
import random

#split the data to training, validation and testing sets
random.seed(42)


def data_split(
    FILE_DATA,
    DATA_DIR,
    FILE_TRAIN_BASE,
    FILE_TRAIN_1,
    FILE_VALIDATION,
    FILE_TEST,
    PERCENT_TRAIN_0,
    PERCENT_TRAIN_1,
    PERCENT_VALIDATION,
    PERCENT_TEST,
):
    data = [l for l in open(FILE_DATA, "r")]
    train_file_0 = open(DATA_DIR + "/" + FILE_TRAIN_0, "w")
    train_file_1 = open(DATA_DIR + "/" + FILE_TRAIN_1, "w")
    valid_file = open(DATA_DIR + "/" + FILE_VALIDATION, "w")
    tests_file = open(DATA_DIR + "/" + FILE_TEST, "w")

    num_of_data = len(data)
    num_train_0 = int((PERCENT_TRAIN_0 / 100.0) * num_of_data)
    num_train_1 = int((PERCENT_TRAIN_1 / 100.0) * num_of_data)
    num_valid = int((PERCENT_VALIDATION / 100.0) * num_of_data)
    num_tests = int((PERCENT_TEST / 100.0) * num_of_data)

    data_fractions = [num_train_0, num_train_1, num_valid, num_tests]
    split_data = [[], [], [], []]

    rand_data_ind = 0

    for split_ind, fraction in enumerate(data_fractions):
        for i in range(fraction):
            rand_data_ind = random.randint(0, len(data) - 1)
            split_data[split_ind].append(data[rand_data_ind])
            data.pop(rand_data_ind)

    for l in split_data[0]:
        train_file_0.write(l)

    for l in split_data[1]:
        train_file_1.write(l)

    for l in split_data[2]:
        valid_file.write(l)

    for l in split_data[3]:
        tests_file.write(l)

    train_file_0.close()
    train_file_1.close()
    valid_file.close()
    tests_file.close()


def write_to_s3(fobj, bucket, key):
    return (
        boto3.Session(region_name=region)
        .resource("s3")
        .Bucket(bucket)
        .Object(key)
        .upload_fileobj(fobj)
    )


def upload_to_s3(bucket, channel, filename):
    fobj = open(filename, "rb")
    key = prefix + "/" + channel
    url = "s3://{}/{}/{}".format(bucket, key, filename)
    print("Writing to {}".format(url))
    write_to_s3(fobj, bucket, key)
-----------------------------------------------------------------------------------
%%time
import pandas as pd

s3 = boto3.client("s3")

# Load the dataset from s3
FILE_DATA = "pricing.csv"
s3.download_file(
    "pricing-sagemaker", f"xgboost/data_no_fe_with_no_header.csv", FILE_DATA
)
data=pd.read_csv(FILE_DATA)
data.dropna(inplace=True)
data.to_csv(FILE_DATA, sep=",", index=False)

# split the downloaded data into train/test/validation files
FILE_TRAIN_0 = "pricing.train_0"
FILE_TRAIN_1 = "pricing.train_1"
FILE_VALIDATION = "pricing.validation"
FILE_TEST = "pricing.test"
PERCENT_TRAIN_0 = 35
PERCENT_TRAIN_1 = 35
PERCENT_VALIDATION = 15
PERCENT_TEST = 15

DATA_DIR = "data"

if not os.path.exists(DATA_DIR):
    os.mkdir(DATA_DIR)

data_split(
    FILE_DATA,
    DATA_DIR,
    FILE_TRAIN_0,
    FILE_TRAIN_1,
    FILE_VALIDATION,
    FILE_TEST,
    PERCENT_TRAIN_0,
    PERCENT_TRAIN_1,
    PERCENT_VALIDATION,
    PERCENT_TEST,
)
---------------------------------------------------------------------------------------
# upload the files to the S3 bucket
upload_to_s3(bucket, "train/train_0.csv", DATA_DIR + "/" + FILE_TRAIN_0)
upload_to_s3(bucket, "train/train_1.csv", DATA_DIR + "/" + FILE_TRAIN_1)
upload_to_s3(bucket, "validation/validation.csv", DATA_DIR + "/" + FILE_VALIDATION)
upload_to_s3(bucket, "test/test.csv", DATA_DIR + "/" + FILE_TEST)
--------------------------------------------------------------------------------------
instance_type = "ml.m5.2xlarge"
output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "pricing-dist-xgb")
# Xgboost supports several content types. We will use CSV
content_type = "text/csv"
--------------------------------------------------------------------------------------

import sagemaker
import boto3
from sagemaker import image_uris
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput

# initialize hyperparameters
hyperparams = {
    "max_depth": "7",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.7",
    "objective": "reg:squarederror",
    "num_round": "100",
    "eval_metric":"rmse",
    "verbosity": "2",
}

# set an output path where the trained model will be saved
output_path = 's3://{}/{}/{}/output'.format(bucket, prefix, 'pricing-xgb-built-in-algo')

# this line automatically looks for the XGBoost image URI and builds an XGBoost container.
# specify the repo_version depending on your preference.
xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-2")

# construct a SageMaker estimator that calls the xgboost-container
estimator = sagemaker.estimator.Estimator(image_uri=xgboost_container, 
                                          hyperparameters=hyperparams,
                                          role=sagemaker.get_execution_role(),
                                          instance_count=1, 
                                          instance_type='ml.m5.2xlarge', 
                                          volume_size=5, # 5 GB 
                                          output_path=output_path)

train_input = TrainingInput(
    "s3://{}/{}/{}/".format(bucket, prefix, "train"), content_type=content_type
)
validation_input = TrainingInput(
    "s3://{}/{}/{}/".format(bucket, prefix, "validation"), content_type=content_type
)
#this one actually does the job by training the model
estimator.fit({"train": train_input, "validation": validation_input})
-------------------------------------------------------------------------------------------------------------------
# deploy command will create a rest api end poing
predictor = estimator.deploy(
    initial_instance_count=1, 
    instance_type="ml.m5.2xlarge",
    endpoint_name='Xgboost-basic'
)
-------------------------------------------------------------------------------------
# read the test data. We want to predict the price column, so we need to remove it with all the data it has
data_train=pd.read_csv(DATA_DIR + "/" + FILE_TEST)
data_train.columns = [
    "ItemPrice",
    "BranchNum",
    "ItemCode",
    "ItemDiscount",
    "year",
    "month",
    "day",
    "hour"
]
#Remove item price column since we want to predict it
data_train = data_train.drop(["ItemPrice"], axis=1)


------------------------------------------------------------------------------------
# just show the data 
import io
from io import StringIO
csv_file = io.StringIO()
data_train = data_train.iloc[1: , :]

# by default sagemaker expects comma seperated data
data_train.to_csv(csv_file, sep=",", header=False, index=False)
# the payload size is limited so we will use only 1000 records
data_train  =data_train.head(1000)
payload = csv_file.getvalue()
print(payload )
-----------------------------------------------------------------------------------
# run the prediction and print the output
runtime_client = boto3.client("runtime.sagemaker", region_name=region)
response = runtime_client.invoke_endpoint(
    EndpointName=predictor.endpoint_name, ContentType="text/csv", Body=payload 
)
result = response["Body"].read().decode("ascii")
print("Predicted values are {}.".format(result))
-------------------------------------------------------------------------------------
I am going to post the link to the notebook and s3 datafile at the end of this post.


Several observations:
RMSE mertic was bad. Its value was about 60. (the lower the value the better). But I will give it a try
I am going to predict the values for 3 records:

109.9,006,256180,0,2022,03,14,08
8.9,012,5053990127726,0,2022,03,14,19
14.9,003,8690784516778,0,2022,03,15,17

The bold values are the actual prices. Lets run the prediction and see what we get 

Example of one prediction:
# run the prediction and print the output
runtime_client = boto3.client("runtime.sagemaker", region_name=region)
response = runtime_client.invoke_endpoint(
    EndpointName=predictor.endpoint_name, ContentType="text/csv", Body='003,8690784516778,0,2022,03,15,17'
)
result = response["Body"].read().decode("ascii")
print("Predicted values are {}.".format(result))

I got the following results for prices:

66.95
13.73
15.08


Considering my BAD RMSE I am not surprised that I got those values

So what can I do to improve the results? Well I don't have too much columns to play with. Lets  focus on the "branch" column. Maybe the algorithm actually "thinks" that this column represents some kind of ranking. But my purpose is to make it think that the item is actually located in specific branch and not located in another branch
As a result the dataset now looks like


"Branch" column is gone. Instead, each branch got it's own column. If item exists in this branch the column value will be 1 and all the rest of the column will get 0.

This dataset was also uploaded to s3.

Changes to the notebook code:
Note the change in the file name
s3.download_file(
    "pricing-sagemaker", f"xgboost/data_fe_with_no_header.csv", FILE_DATA
)

-------------------------------------------------------------------------------------------------------
Column list in the test data has different column

data_train=pd.read_csv(DATA_DIR + "/" + FILE_TEST)
data_train.columns = [
    "ItemPrice",
    "ItemCode",
    "ItemDiscount",
    "year",
    "month",
    "day",
    "hour",
    "Is1",
    "Is2",
    "Is3",
    "Is5",
    "Is6",
    "Is7",
    "Is8",
    "Is9",
    "Is10",
    "Is12",
    "Is13",
    "Is14",
    "Is15",
    "Is16",
    "Is50",
    "Is334"
]
#Remove item price column since we want to predict it
data_train = data_train.drop(["ItemPrice"], axis=1)
------------------------------------------------------------------------------------
At the first glans nothing chaned. My RMSE still something like 60. But lets run the same prediction. Maybe something changed after all

87.33
14.92
14.62

Original Values   Basic Feature Eng.   Advanced Feature Eng.
109.9                          66.95                87.33
8.9                              13.73                14.92
14.9                            15.08                14.62
  
Except the last value it doesn't looks to good. In the part 2 I will try the same with Linear Learner algorithm. Maybe it will do a better job

All files from this blog can be found here


Thursday, 10 March 2022

Using Cloud Watch Agent to Log messages from EC2 DotNet webApi to AWS CloudWatch

 So you have your dotnet webapi. And this api runs on AWS EC2 instance and produces logs. It will be nice that you don't have to login into the EC2 with some remote client like putty, but see the logs directly in the CloudWatch UI provided by AWS. 

Why do you want to send it to the CloudWatch? Because AWS provides you with a lot of tools that allows to query the CloudWatch logs in the friendly way (for example with SQL ) and display the results in the dashboard.

By default, when you write application logs, they are written to the location you specify and nothing more. You can also use AWS SDK to write application logs directly to CloudWatch. But today you run your instance with AWS and tomorrow you decide that you want to move the application to another cloud. But what about all the code that actually writes the logs to CloudWatch. It becomes irrelevant. 

The solution is to use the CloudWatch Agent. It is AWS utility that runs on the EC2 and can monitor log files and once it has a new content, it automatically sending the content to CloudWatch. Let me show you how to use it. 

What I am not going to explain here:

1. How to start the EC2 instance 

2. How to develop dotnet webapi

So I created very basic dotnet webapi. (You know the "weather" template).

The only thing I added is "Serilog" logging. (If you didn't use Serilog in you .Net application, you probably first year student or just emigrated from Mars).



And I put some custom log message when accessing the "weather"  end point



EC2 instance I am running is Centos 7. I followed this Microsoft guide to install the dotnet runtime (I used dotnet 3.1 release)

I started the application:



and tested that it is accesible


I checked that Serilog produced a log file and indeed it was there


So far so good. Now we need to install the agent

sudo yum update -y

sudo yum install -y awslogs

Modify "/etc/awslogs/awslogs.conf"

Change the content to point to the same region where you started the EC2 instance.
sudo yum install amazon-cloudwatch-agent

Now you need to configure the agent.
run "/opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-config-wizard" command.
It will open some configuration wizard. You can accept all the defaults till the point the wizard will ask:

For the log file provide the full path to the dotnet application log file and also specify the log group name. It will be used to identify your logs in CloudWatch.

As the last output of the wizard you will see the location of agent configuration file


You will need this path because the next command will actually start the agent by using this path as a parameter 
 /opt/aws/amazon-cloudwatch-agent/bin/amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -s -c ssm:configuration-parameter-store-name

Almost done. All of the above will not work if your EC2 instance doesn't has a role that allows to write to the cloud watch. Go ahead and create the role with the following policy and attach it to EC2


Now access "weather" url several times to generate some logs (By the way forgot to mention that you need to open port 5001 in EC2 security group tp access application end-point externally).

So lets check the CloudWatch interface for the logs.

Yes!!! We see the logs.