Tuesday, 29 March 2022

Overview of some of the AWS NoSql databases

 I do deep dive into AWS NoSQL solutions. I want to share my finding with you. 

I did the overview of the following databases

1. DynamoDB

2. DocumentDB

3. Keyspaces

4. OpenSearch


It is not a compare. Just basic overview with useful links 

Dynamo DB

  • AWS Managed – serverless with ACID support.
  • Can target up to 25 distinct items in one or more DynamoDB tables within the same AWS account and in the same Region in one transaction. The aggregate size of the items in the transaction cannot exceed 4 MB
  • Can create 256 table per account per region
  • Records= items. Each item can have a collection of attributes. Item size is limited to 400K (including attributes names). Sizing. There is no practical limit on a table's size
  • Use capacity units for throttling and pricing. For reads – RCU, for writes WCU. You can use up to 3,000 Read Capacity Units (RCUs) and up to 1,000 Write Capacity Units (WCUs) on a single partition per second. Note — this is a lot of capacity! This would allow you to read 12MB of strongly-consistent data or 24MB of eventually-consistent data per second, as well as to write 1MB of data per second
  • Scalable and highly available
  • Uses Streams for event-driven development and to log DB events. Emits events on table modification. Like triggers in RDS. Preserves modification order. Can invoke Lambda to send streams event for farther processing. (always use try catch since the lambda will retry all the time in case of exception). Each streams read request unit can return up to 1 MB of data.
  • Access control: Can control Full access to the table, Attribute access (column) or item access Access Control
  • Partition Key- defined on one attribute. Think about it as a bucket in S3. Partition key is hashed and replicated. Can contain non unique values
  • Partition Key+Sort: defined on 2 items. Think about sort key as a folders in the s3 bucket. Allow to specify conditions. The combination of primary and sort key should be unique.
  • Consistency: Writes  - you get an acknowledge from at least 2 replicas and write has eventual consistency. Reads – Two modes: default eventually consistent, you can also have strong consistency but it costs more. Strong consistency consumes one capacity unit. Eventually consistency consumes half capacity unit
  • Global Secondary indexes –there is a limit of 20 indexes, but can be requested to be increased. It actually creating new table with a new Partition Key (which is set on GSI attribute), but keeps the 2 tables in synch. Note that RCU/WCU usage are taken into consideration than using this index. It doubles the price of writing. Eventual consistency with GSI is less predicted
  • Local Secondary indexes – limit  of 5 per table. Similar to normal index in RDS. Can be created ONLY at creation time. You can project (add) other attributes to the index. Item collection with LSI is limited to 10GB LSI
  • GSI vs LSI
  • DynamoDB does not provide aggregation functions
  • Can query data in SQL style with Party. But write the query carefully and avoid full table scan. It will consume a lot of Capacity Units
  • DynamoDb has a caching layer called DAX. Additional charges apply since the procing model of DAX is based on the underlying infrastructure and not capacity units.
  • Two payment modes: Provisioned Capacity units: you set how much RCU and WCU do you want to use. It supports Auto Scaling (but it can take several minutes to scale the table). Use this for consistent or predictable traffic patterns. On-Demand: You only pay for what you use per request. Best for unpredictable or/and random traffic. Capacity Mode . Switching from provisioned to On-demand can take several minutes.
  • Global Tables: Cross-Region tables. Async replication between tables. In case of network problem the data is queued and eventually written to the table. No strong consistency. On simultaneous writes, the last operation wins.
  • Encryption at rest and at transit
  • Table Classes: Dynamo has 2 classes. One for standard workload and second for the load where the storage is dominant. With infrequent class, the storage is cheaper but the capacity units cost more
  • Some links: Quotas and Limits  Best Practices  Cost

Document DB



  • DocumentDB is compatible with MongoDB 3.6 and 4.0 drivers and tools with support of ACID transactions. However the underlying infrastructure (software) is completely different. There are a lot of differences. Migration from Mongo to DocumentDB is possible with DMS service.
  • Doesn’t support Mongo DB sharding.
  • Supports GUI tools like: Compass, Robo 3T, Studio 3T,mongoboster
  • Auto scaling with storage up to 64TB. Up to 15 Read-Replicas. Support global Cluster with up to 5 regions. Multi-AZ support. Can scale up to millions of read per second. Read Replicas share the same storage as Master in the same region. On failover read replica with the highest priority becomes the new master.
  • You pay for instances, IO for read/write, storage and backup storage
  • Encryption in rest and in transit. It is not possible to encrypt existing unencrypted cluster
  • Change Streams: capture the changes of data in the cluster (Similar to Streams in dynamoDB). Can be integrated with other AWS services with Lambda and written to other services (Elastic,S3). AWS Glue also supports DocumentDB. Use it with SPART ETL to read transform and load data into S3 and Redshift Athena is also supported for DocumentDB with Athena Connector.
  • Integration with IAM
  • Reads from DocumentDB are eventual consistant. Supports TTL, but exact time of deletion is not guaranteed.
  • Common use case: Write data to DocumentDB. Use Lambda to push the data to Elastic Search and use free text to search what you need.
  • How Mongo “sees” the differences:
  • Compare Mongo and DocumentDB External sources comparison
  • Index: when you create the collection the default index created on the “id” field. “Single Field Index”: can index on any field in the collection. You can specify the index to be ASC or DESC. When you create an index on non existing collection, the collection also created. Indexes can be also created on the nested fields. “Compound filter”: create index on several fields. “Multi Key Index”: allows to create an index on arrays to find an element in array. “Compound multi key”: combination of one “multi key” index with another non “non multi key” index. “Sparse Index”: define it is some field appears only in small portion of the data (have to specify $exists: true in the query). “TTL”: is actually also an index. Each deletion incur IO cost. Note!! Using indexes will involve more storage and IO usage and as a result more cost Blog about indexes
  • Links: Quota and Limits Best Practices

Keyspaces


  • Managed service compatible with Apache Cassandra. NoSQL database. Supports Cassandra Query Language. Tables are replicated 3 times in multiple AZ. Can recover to any point in time up to 35 days. No multi-region support.
  • Supports “On Demand” and “provisioned” mode with Auto Scaling. Auto Scaling may take several minutes. Supports TTL (doesn’t consumes read/write, but has a separate cost) capacity. No limitation for rows or table size.
  • Cost: You pay for read operations in chunks of 4K. LOCAL_QUORUM consistency costs twice vs LOCAL_ONE consistency. Write operations priced in chunks of 1K. There is also the payment for the storage and management operations like Restoring the table or TTL
  • Encryption in rest and in transit.
  • There is no strong consistency. You can demand the “read” be consistent in at least 2 nodes (LOCAL_QUORUM). Consistency
  • Static columns: there is an option to define static column. The value stored in this column is shared between all rows in a logical partition. When you update the value of this column, Amazon Keyspaces applies the change automatically to all rows in the partition.
  • Access Control: Only IAM. Can control access to keyspace or table.
  • Primary key. Can consist of one or multiple partition key columns, which determine which partitions the data is stored in, and one or more optional clustering column, which define how data is clustered and sorted within a partition
  • Indexes are not supported!!!
  • There is no full support for Cassandra operations. E.g. Aggregation functions are not supported , triggers are not supported. Full Compare
  • Links Quota and Limits Examples

Open Search



  • OpenSearch. A fork from Elastic search created by AWS after the license was changed. OpenSeach includes Kinaba that allows to managed the data and visualize it. Open search is not serverless, but fully managed. It means you need to choose the instances and storage. Common use-cases: log analytics, real-time application monitoring, website search
  • Supports Multi-AZ, but you can choose to run on the single AZ. Scaling doesn’t require downtime. Multi Region is not supported directly, but you can define 2 clusters in different regions and do “cross cluster” replication
  • No support for ACID transactions for multiple documents. More information.
  • Supports “On-Demand” and “Reserved” instances for cost saving
  • Can query with API (Rest) or OpenSearch SQL language. Supports Asynchronous search. Supports aggregations.
  • Encryption in rest and in transit.
  • Storage: Cold - retain infrequently accessed data in Amazon S3. "Hot" - hot tier is powered by data nodes which are used for indexing, updating, and providing the fastest access to data. With Cold storage you can detach the indices. "UltraWarm" – Compliment to hot storage allows for fast retrieval of frequently accessed read-only data. Supports up to 3 PB of primary data. The maximum storage per node is 15 TB with the i3.16xlarge instances.  You can allocate about 600 TB of storage to a single domain. This is a soft limit.
  • Access Control: can be controlled with IAM or Cognito (to grant access to Kibana). OpenSearch security plugin, enabling you to define granular permissions for indices, documents, or fields. You can also extend Kibana with read-only views and secure multi-tenant support.
  • Integration with Logstash, Kinesis Data Firehose.
  • Logs: Slow logs: Can be enabled/disabled. Can be set on specific index. Index Slow Logs – These logs provide insights into the indexing process and can be used to fine-tune the index setup. Search Slow Logs – These logs provide insights into how fast or slow queries and fetches are performing. These logs help fine tune the performance of any kind of search operation. Error logs are exposed for the entire domain. Each log entry made into CloudWatch will be limited to 255,000 characters
  • Index rollups in Amazon OpenSearch Service let you reduce storage costs by periodically rolling up old data into summarized indices.
  • Transform jobs let you create a different, summarized view of your data centered around certain fields, so you can visualize or analyze the data in different ways.
  • Consistency: Elasticsearch read consistency is eventually consistent. However there are some flags you can play with to make it strong consistent for the same client. Source.
  • Concepts: Data is stored as a JSON document (table in RDS).Examples for document: blogpost, comments, authors. Collection of the documents are called index. Shards – logically divide your data to pieces. Each piece can be stored on the same machine or multiple machines. Collection of the shards are called Cluster. Replicas – contain the same data as shard for high availability. Cluster divided to nodes. Each node contains several shards or replicas. Each time you add a node , overall shards will distribute themselves to all nodes evenly. Mappings – definition of the attributes and their types in json document. (similar to RDS columns definition)
  • Links: Best Practice Quota and Limits

All of the written above is correct only to the moment of writing this post. AWS investing a lot in improving it's services, so you may expect the things to change.

Thursday, 17 March 2022

My First Steps with AWS SageMaker - Part 1

 Let me start by saying that I had zero knowledge about ML before 1 week. I also didn't write a single line of Python code before I was exposed to ML. So if you see or think that I am doing something really stupide or writing some "noob" code - sorry,

So what I am going to do. 

I an going to predict the prices of the goods by using Linear Regression ML approach. For this I am going to use the following 


1. Dataset with basic feature engineering

2. Dataset with more advanced  feature engineering

3. Xgboost, Linear Learner and SageMaker autopilot


My Dataset looks like this 




It has the actual item. 
The branch number that the item belongs to. 
Item code (barcode).
Discount (if any)
Date fields. Since ML doesn't know what is date I engineered it to separate columns

The dataset has several hundred thousands records and I uploaded it to the AWS s3.
The rest of the code shows the Jupiter notebook that uses S3 as the source of the data, creating training, validation and test datasets, train the model by using Sagemaker build in Xgboost image and deploy as rest api

Important to note:
1. The file in s3 should not  contain headers
2. You don't need to specify the "label" column. The algorithm always assumes that it is the first column in the CSV file 

Note the comments in the body of the code.
Each block represents the code block in the Jupiter notebook

%%time

import os
import boto3
import re
import sagemaker

# Get a SageMaker-compatible role used by this Notebook Instance.
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

### update below values appropriately ###
bucket = sagemaker.Session().default_bucket()
prefix = "sagemaker/xgboost-no-fe"
####

print(region)

-----------------------------------------------------------------------------------
%%time

import io
import boto3
import random

#split the data to training, validation and testing sets
random.seed(42)


def data_split(
    FILE_DATA,
    DATA_DIR,
    FILE_TRAIN_BASE,
    FILE_TRAIN_1,
    FILE_VALIDATION,
    FILE_TEST,
    PERCENT_TRAIN_0,
    PERCENT_TRAIN_1,
    PERCENT_VALIDATION,
    PERCENT_TEST,
):
    data = [l for l in open(FILE_DATA, "r")]
    train_file_0 = open(DATA_DIR + "/" + FILE_TRAIN_0, "w")
    train_file_1 = open(DATA_DIR + "/" + FILE_TRAIN_1, "w")
    valid_file = open(DATA_DIR + "/" + FILE_VALIDATION, "w")
    tests_file = open(DATA_DIR + "/" + FILE_TEST, "w")

    num_of_data = len(data)
    num_train_0 = int((PERCENT_TRAIN_0 / 100.0) * num_of_data)
    num_train_1 = int((PERCENT_TRAIN_1 / 100.0) * num_of_data)
    num_valid = int((PERCENT_VALIDATION / 100.0) * num_of_data)
    num_tests = int((PERCENT_TEST / 100.0) * num_of_data)

    data_fractions = [num_train_0, num_train_1, num_valid, num_tests]
    split_data = [[], [], [], []]

    rand_data_ind = 0

    for split_ind, fraction in enumerate(data_fractions):
        for i in range(fraction):
            rand_data_ind = random.randint(0, len(data) - 1)
            split_data[split_ind].append(data[rand_data_ind])
            data.pop(rand_data_ind)

    for l in split_data[0]:
        train_file_0.write(l)

    for l in split_data[1]:
        train_file_1.write(l)

    for l in split_data[2]:
        valid_file.write(l)

    for l in split_data[3]:
        tests_file.write(l)

    train_file_0.close()
    train_file_1.close()
    valid_file.close()
    tests_file.close()


def write_to_s3(fobj, bucket, key):
    return (
        boto3.Session(region_name=region)
        .resource("s3")
        .Bucket(bucket)
        .Object(key)
        .upload_fileobj(fobj)
    )


def upload_to_s3(bucket, channel, filename):
    fobj = open(filename, "rb")
    key = prefix + "/" + channel
    url = "s3://{}/{}/{}".format(bucket, key, filename)
    print("Writing to {}".format(url))
    write_to_s3(fobj, bucket, key)
-----------------------------------------------------------------------------------
%%time
import pandas as pd

s3 = boto3.client("s3")

# Load the dataset from s3
FILE_DATA = "pricing.csv"
s3.download_file(
    "pricing-sagemaker", f"xgboost/data_no_fe_with_no_header.csv", FILE_DATA
)
data=pd.read_csv(FILE_DATA)
data.dropna(inplace=True)
data.to_csv(FILE_DATA, sep=",", index=False)

# split the downloaded data into train/test/validation files
FILE_TRAIN_0 = "pricing.train_0"
FILE_TRAIN_1 = "pricing.train_1"
FILE_VALIDATION = "pricing.validation"
FILE_TEST = "pricing.test"
PERCENT_TRAIN_0 = 35
PERCENT_TRAIN_1 = 35
PERCENT_VALIDATION = 15
PERCENT_TEST = 15

DATA_DIR = "data"

if not os.path.exists(DATA_DIR):
    os.mkdir(DATA_DIR)

data_split(
    FILE_DATA,
    DATA_DIR,
    FILE_TRAIN_0,
    FILE_TRAIN_1,
    FILE_VALIDATION,
    FILE_TEST,
    PERCENT_TRAIN_0,
    PERCENT_TRAIN_1,
    PERCENT_VALIDATION,
    PERCENT_TEST,
)
---------------------------------------------------------------------------------------
# upload the files to the S3 bucket
upload_to_s3(bucket, "train/train_0.csv", DATA_DIR + "/" + FILE_TRAIN_0)
upload_to_s3(bucket, "train/train_1.csv", DATA_DIR + "/" + FILE_TRAIN_1)
upload_to_s3(bucket, "validation/validation.csv", DATA_DIR + "/" + FILE_VALIDATION)
upload_to_s3(bucket, "test/test.csv", DATA_DIR + "/" + FILE_TEST)
--------------------------------------------------------------------------------------
instance_type = "ml.m5.2xlarge"
output_path = "s3://{}/{}/{}/output".format(bucket, prefix, "pricing-dist-xgb")
# Xgboost supports several content types. We will use CSV
content_type = "text/csv"
--------------------------------------------------------------------------------------

import sagemaker
import boto3
from sagemaker import image_uris
from sagemaker.session import Session
from sagemaker.inputs import TrainingInput

# initialize hyperparameters
hyperparams = {
    "max_depth": "7",
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "6",
    "subsample": "0.7",
    "objective": "reg:squarederror",
    "num_round": "100",
    "eval_metric":"rmse",
    "verbosity": "2",
}

# set an output path where the trained model will be saved
output_path = 's3://{}/{}/{}/output'.format(bucket, prefix, 'pricing-xgb-built-in-algo')

# this line automatically looks for the XGBoost image URI and builds an XGBoost container.
# specify the repo_version depending on your preference.
xgboost_container = sagemaker.image_uris.retrieve("xgboost", region, "1.2-2")

# construct a SageMaker estimator that calls the xgboost-container
estimator = sagemaker.estimator.Estimator(image_uri=xgboost_container, 
                                          hyperparameters=hyperparams,
                                          role=sagemaker.get_execution_role(),
                                          instance_count=1, 
                                          instance_type='ml.m5.2xlarge', 
                                          volume_size=5, # 5 GB 
                                          output_path=output_path)

train_input = TrainingInput(
    "s3://{}/{}/{}/".format(bucket, prefix, "train"), content_type=content_type
)
validation_input = TrainingInput(
    "s3://{}/{}/{}/".format(bucket, prefix, "validation"), content_type=content_type
)
#this one actually does the job by training the model
estimator.fit({"train": train_input, "validation": validation_input})
-------------------------------------------------------------------------------------------------------------------
# deploy command will create a rest api end poing
predictor = estimator.deploy(
    initial_instance_count=1, 
    instance_type="ml.m5.2xlarge",
    endpoint_name='Xgboost-basic'
)
-------------------------------------------------------------------------------------
# read the test data. We want to predict the price column, so we need to remove it with all the data it has
data_train=pd.read_csv(DATA_DIR + "/" + FILE_TEST)
data_train.columns = [
    "ItemPrice",
    "BranchNum",
    "ItemCode",
    "ItemDiscount",
    "year",
    "month",
    "day",
    "hour"
]
#Remove item price column since we want to predict it
data_train = data_train.drop(["ItemPrice"], axis=1)


------------------------------------------------------------------------------------
# just show the data 
import io
from io import StringIO
csv_file = io.StringIO()
data_train = data_train.iloc[1: , :]

# by default sagemaker expects comma seperated data
data_train.to_csv(csv_file, sep=",", header=False, index=False)
# the payload size is limited so we will use only 1000 records
data_train  =data_train.head(1000)
payload = csv_file.getvalue()
print(payload )
-----------------------------------------------------------------------------------
# run the prediction and print the output
runtime_client = boto3.client("runtime.sagemaker", region_name=region)
response = runtime_client.invoke_endpoint(
    EndpointName=predictor.endpoint_name, ContentType="text/csv", Body=payload 
)
result = response["Body"].read().decode("ascii")
print("Predicted values are {}.".format(result))
-------------------------------------------------------------------------------------
I am going to post the link to the notebook and s3 datafile at the end of this post.


Several observations:
RMSE mertic was bad. Its value was about 60. (the lower the value the better). But I will give it a try
I am going to predict the values for 3 records:

109.9,006,256180,0,2022,03,14,08
8.9,012,5053990127726,0,2022,03,14,19
14.9,003,8690784516778,0,2022,03,15,17

The bold values are the actual prices. Lets run the prediction and see what we get 

Example of one prediction:
# run the prediction and print the output
runtime_client = boto3.client("runtime.sagemaker", region_name=region)
response = runtime_client.invoke_endpoint(
    EndpointName=predictor.endpoint_name, ContentType="text/csv", Body='003,8690784516778,0,2022,03,15,17'
)
result = response["Body"].read().decode("ascii")
print("Predicted values are {}.".format(result))

I got the following results for prices:

66.95
13.73
15.08


Considering my BAD RMSE I am not surprised that I got those values

So what can I do to improve the results? Well I don't have too much columns to play with. Lets  focus on the "branch" column. Maybe the algorithm actually "thinks" that this column represents some kind of ranking. But my purpose is to make it think that the item is actually located in specific branch and not located in another branch
As a result the dataset now looks like


"Branch" column is gone. Instead, each branch got it's own column. If item exists in this branch the column value will be 1 and all the rest of the column will get 0.

This dataset was also uploaded to s3.

Changes to the notebook code:
Note the change in the file name
s3.download_file(
    "pricing-sagemaker", f"xgboost/data_fe_with_no_header.csv", FILE_DATA
)

-------------------------------------------------------------------------------------------------------
Column list in the test data has different column

data_train=pd.read_csv(DATA_DIR + "/" + FILE_TEST)
data_train.columns = [
    "ItemPrice",
    "ItemCode",
    "ItemDiscount",
    "year",
    "month",
    "day",
    "hour",
    "Is1",
    "Is2",
    "Is3",
    "Is5",
    "Is6",
    "Is7",
    "Is8",
    "Is9",
    "Is10",
    "Is12",
    "Is13",
    "Is14",
    "Is15",
    "Is16",
    "Is50",
    "Is334"
]
#Remove item price column since we want to predict it
data_train = data_train.drop(["ItemPrice"], axis=1)
------------------------------------------------------------------------------------
At the first glans nothing chaned. My RMSE still something like 60. But lets run the same prediction. Maybe something changed after all

87.33
14.92
14.62

Original Values   Basic Feature Eng.   Advanced Feature Eng.
109.9                          66.95                87.33
8.9                              13.73                14.92
14.9                            15.08                14.62
  
Except the last value it doesn't looks to good. In the part 2 I will try the same with Linear Learner algorithm. Maybe it will do a better job

All files from this blog can be found here