Scroll Top

How To Ingest Both Real-Time And Batch Data Into An AWS Timestream Using Lambda And Execute A Query From Pycharm

Feature Image 2

I had a chance to work with this NoSQL timeseries database. So, I thought of sharing a simple framework. Use Lambda to ingest data from S3 into an AWS TimeStream, then remotely run queries from your local Pycharm.

Architecture Diagram:

What’s happening on the architecture diagram:

This is the more elaborated version of how to ingest both real-time and batch data concurrently using a single Lambda.

Let’s first examine the tech stack, which consist of:

  • AWS S3
  • AWS Lambda
  • AWS CloudWatch
  • AWS TimeStream
  • Pycharm

We will now unload the source CSV data into the S3 Raw Source bucket. That will trigger the splitting lambda. In the Lambda Console, we can set the trigger to a specific S3 bucket using EventBridge.

Purpose of Splitter Lambda:

Assume that when it comes to batch data, the file size can vary from MB to GB. The Lambda’s total memory is 10240 MB, and during the transformation and ingestion, it can only safely process files up to 200 MB in size. As a result, this lambda will break large files into chunks of processable files based on the user-provided size.

For instance, the lambda specifies a size limit of 200 MB. If the file size surpasses this limit, the file chunks it and loads it into the S3 Structured Source. If not, it loads the original file into the S3 Structured Source Bucket, using date partitioning to create folders.

Below is the Python script that splits large files into chunks based on the provided size.

import boto3
import json
import os
import csv
import io
import datetime
import re
import awswrangler as wr
import numpy as np

s3 = boto3.client('s3')

def lambda_handler(event, context):

    ## Attributes extraction from parameters
    # Destination Bucket name
    dest_bucket = os.environ['bucket_name']
    
    ## Extraction of Source Path
    event_result = event['Records']
    bucket_name = ''.join([sub['s3']['bucket']['name'] for sub in event_result])
    object_name = ''.join([sub['s3']['object']['key'] for sub in event_result])
    path = "s3://" + bucket_name + "/" + object_name
    
    ## Gets the required file
    size_reader = s3.get_object(Bucket=bucket_name, Key=object_name)

    ## Providing the required size to chunk
    chunk_size = 200 * 1024 * 1024

    ## Reading data from Source
    df = wr.s3.read_csv(path)

    ## partition for upload split DataFrame to S3
    today = datetime.datetime.now()
    datim = today.strftime("%Y%m%dT%H%M%S")
    year = today.strftime("%Y")
    month = today.strftime("%m")
    day = today.strftime("%d")
    
    ## Getting the size value of the file
    file_size = size_reader['ContentLength']
    
    if file_size > chunk_size:
        ## Logic for splitting the actual size value into chunks based on chunk_size variable
        num_chunks = file_size // chunk_size + 1
        
        ## Using numpy's array_split function the dataframe is splitted into chunks of df's
        chunks = np.array_split(df, num_chunks)
        
        ## The chunked datframes is uploaded to S3 under the loop
        for i, chunk in enumerate(chunks):
            chunk_data = chunk.to_csv(index=False)
            output = "raw_source/" + year + "/" + month + "/" + day + "/" + desired_name + "/" + "part" + str(i + 1) + "-" + desired_name + ".csv"
            s3.put_object(Body=chunk_data, Bucket=dest_bucket, Key=output)
    else:
        ## When the file size is smaller than the desired value whole file is uploaded to S3
        s3_c = boto3.resource('s3')
        copy_source = {
        'Bucket': bucket_name,
        'Key': object_name
        }
        output = "raw_source/" + year + "/" + month + "/" + day + "/" + desired_name + "/" + desired_name + ".csv"
        # print(output)
        s3_c.meta.client.copy(copy_source, dest_bucket, output)

        
    # TODO implement
    return {
        'statusCode': 200
    }

 

The Transformation Lambda will activate after loading the chunked files or the small-sized original file into the S3 Structured Source.

Purpose of Transformation Lambda:

The primary objective of this lambda is to process both real-time and batch data in accordance with the trigger types.

This lambda is triggered every time a file is loaded into S3 Structured Source, and then the developed real-time and batch logics are used based on the file size.

(Note: According to the scenario in which I worked, the real-time data was less than 1 MB and the batch data size was between 50 and 200 MB.)

For instance, the real-time logic will execute if the file size is less than 1 MB, and the batch logic will run if the file size is greater than that.

What does real-time logic do?

We apply the for-loop concept to the transformations and mapping process, appending the transformed data into a list. We then use the Boto3 write_record function to ingest that list into the AWS TimeStream.

response = client.write_records(DatabaseName=DB_NAME, TableName=TBL_NAME, Records=record)

What does Batch Logic do?

We apply the same For-loop concept to the transformation and mapping process, append it to a list, and store it in a subfolder under the S3 Structured Source bucket.

Now, an in-built function known as Batch Load Task is available for ingesting bulk data into an AWS TimeStream. This will assist in inserting GBs of data from S3 into TimeStream in a single run.

For example, once all the batch loads are completed for the day and all the transformed data is stored under a sub-folder in the S3 Structured Source bucket, a cron job under the AWS CloudWatch Rule is scheduled to trigger the same lambda by the end of the day to run the create_batch_load_task function, which will create a Batch Load Task in the AWS TimeStream that’ll ingest all the data from the pointed source (AWS S3) to the required Destination TimeStream Table.

The following is a template for creating a Batch Load Task using the Python SDK:

response = client.create_batch_load_task(TargetDatabaseName=DB_NAME, TargetTableName=TBL_NAME,
                                 DataModelConfiguration={
                                     'DataModel': {
                                         "TimeColumn": "Timestamp",
                                         "TimeUnit": "MILLISECONDS",
                                         "DimensionMappings": [{
                                             "SourceColumn": "Dimension_Columnname_from_S3",
                                             "DestinationColumn": "Dimension_Columnname_to_TimeStream"
                                         }
                                         ],
                                         "MultiMeasureMappings": {
                                             "MultiMeasureAttributeMappings": [
                                                 {
                                                     "SourceColumn": "MeasureValue_Column_from_S3",
                                                     "TargetMultiMeasureAttributeName": "MeasureValue_Column_to_TimeStream",
                                                     "MeasureValueType": "DOUBLE"
                                                 },
                                                 {
                                                    "SourceColumn": "source_time_from_S3",
                                                    "TargetMultiMeasureAttributeName": "source_time_to_TimeStream",
                                                    "MeasureValueType": "TIMESTAMP"
                                                  }
                                             ]
                                         },
                                         "MeasureNameColumn": "measure_name"
                                     }
                                 },
                                 DataSourceConfiguration={
                                     "DataSourceS3Configuration": {
                                         "BucketName": 'bucket_name',
                                         "ObjectKeyPrefix": output
                                     },
                                     "DataFormat": "CSV"
                                 },
                                 ReportConfiguration={
                                     "ReportS3Configuration": {
                                         "BucketName": 'bucket_name',
                                         "ObjectKeyPrefix": 'sub_folder',
                                         "EncryptionOption": 'SSE_S3'
                                     }
                                 }
                                 )
    print(response)

 

The complete Python script that can handle both real-time and batch data simultaneously based on S3 and CloudWatch Rule triggers is below:

import boto3
import json
import awswrangler as wr
import os
import csv
import io
import random
import string

def lambda_handler(event, context):
    ## Capture Transformation Start time
    from datetime import datetime
    time_var = datetime.now()
    cur = time_var.strftime("%d/%m/%Y %H:%M:%S")
    print("Process started at", cur)
    
    ## Attributes extraction from parameters
    # TimeStream Details
    DB_NAME = os.environ['DB_NAME']
    TBL_NAME = os.environ['TBL_NAME']
    
    ## Reading data from Source
    client = boto3.client('timestream-write')

    ## Identify the lambda purpose
    try:
        event_trigger = event['Records'][0]['eventSource']
    except:
        event_trigger = event['source']
    
    if event_trigger == 'aws:s3':
        print("Entering record transformation and mapping Process")
        ## Extraction of Source Path
        event_result = event['Records']
        print(event_result)
        bucket_name = ''.join([sub['s3']['bucket']['name'] for sub in event_result])
        object_name = ''.join([sub['s3']['object']['key'] for sub in event_result])
        
        path = "s3://" + bucket_name + "/" + object_name
        
        ## Gets the required file
        size_reader = s3.get_object(Bucket=bucket_name, Key=object_name)
    
        ## Providing the required size to chunk
        desired_size = 10 * 1024 * 1024
        
        ## Getting the size value of the file
        file_size = size_reader['ContentLength']

        df = wr.s3.read_csv(path)
        
        ## Records conversion from csv to dictionary
        json_df = df.to_json(orient="records")
        source = json.loads(json_df)
        
        ## Extraction, transformation and mapping of data from the source
        import datetime
        from time import time
        
        record = []
        for i in range(0, len(source)):
            Product_list = {key.strip(): value for key, value in source[i].items()}

            #---Time transformation from String to MilliSeconds for source data---#
            time_val = Product_list['Timestamp']
            timestamp_epoch = datetime.datetime.strptime(time_val, "%m/%d/%Y %H:%M:%S")
            timestamp_millisecond = timestamp_epoch.timestamp() * 1000
            millisecond_varchar = str(timestamp_millisecond)
            valid_time = millisecond_varchar.replace(".0", "")
            
            #---Current Time transformation from String to MilliSeconds for source data---#
            milliseconds = int(time() * 1000)
            # print(milliseconds)
            
            if file_size < desired_size: ## Process to read and write the source data for near-real_time use-case. key_list = list(Product_list.keys()) dims = key_list[1] block = dims[:2] measure_name = str(block) time_column = { 'Name': 'source_time', 'Value': str(valid_time), 'Type': 'TIMESTAMP' } temp_val = { 'Name': 'measure_value', 'Value': str(Product_list[sens]), 'Type': 'DOUBLE' } dimension = [{'Name': 'DimensionName', 'Value': dims}] overall_value = {'Time': str(milliseconds), 'Dimensions': dimension, 'MeasureName': measure_name, 'MeasureValues': [time_column, temp_val], 'MeasureValueType': 'MULTI'} record = [overall_value] print(record) try: response = client.write_records(DatabaseName=DB_NAME, TableName=TBL_NAME, Records=record) print("WriteRecords Status: [%s]" % response['ResponseMetadata']['HTTPStatusCode']) except client.exceptions.RejectedRecordsException as err: print("RejectedRecords: ", err) for rr in err.response["RejectedRecords"]: print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"]) print("Other records were written successfully. ") except Exception as err: print("Error:", err) if file_size > desired_size:
                key_list = list(Product_list.keys())
                dims = key_list[1]
                overall_value = {'DimensionName ': dims, 'Timestamp': str(milliseconds),
                         'measure_value': str(Product_list[sens]), 'source_time': valid_time, 'measure_name': measure_name}
                record.append(overall_value)
       
        if file_size >desired_size:    
            ## Curation Folder Partition Process for transformed values to be inserted, since batch_load_task only accepts files like csv, json etc...
            today = datetime.datetime.now()
            datim = today.strftime("%Y%m%dT%H%M%S")
            year = today.strftime("%Y")
            month = today.strftime("%m")
            day = today.strftime("%d")
            output = "curated_source/" + year + "/" + month + "/" + day + "/" + dims + "/" + dims + "-" + datim + ".csv"
            
            stream = io.StringIO()
            headers = list(record[0].keys())
            writer = csv.DictWriter(stream, fieldnames=headers)
            writer.writeheader()
            writer.writerows(record)
            
            csv_string_object = stream.getvalue()
            
            s3 = boto3.resource('s3')
            s3.Object('bucket_name', output).put(Body=csv_string_object)
        
    if event_trigger == 'aws.events':
    print("Entering record uploading to TimeStream Process")
    # Process to read and write the source data using batch_load_task function for batch load use-case.
    today = datetime.datetime.now()
    datim = today.strftime("%Y%m%dT%H%M%S")
    year = today.strftime("%Y")
    month = today.strftime("%m")
    day = today.strftime("%d")
    output = "curated_source/" + year + "/" + month + "/" + day + "/"

    response = client.create_batch_load_task(TargetDatabaseName=DB_NAME, TargetTableName=TBL_NAME,
                                 DataModelConfiguration={
                                     'DataModel': {
                                         "TimeColumn": "Timestamp",
                                         "TimeUnit": "MILLISECONDS",
                                         "DimensionMappings": [{
                                             "SourceColumn": "Dimension_Columnname_from_S3",
                                             "DestinationColumn": "Dimension_Columnname_to_TimeStream"
                                         }
                                         ],
                                         "MultiMeasureMappings": {
                                             "MultiMeasureAttributeMappings": [
                                                 {
                                                     "SourceColumn": "MeasureValue_Column_from_S3",
                                                     "TargetMultiMeasureAttributeName": "MeasureValue_Column_to_TimeStream",
                                                     "MeasureValueType": "DOUBLE"
                                                 },
                                                 {
                                                    "SourceColumn": "source_time_from_S3",
                                                    "TargetMultiMeasureAttributeName": "source_time_to_TimeStream",
                                                    "MeasureValueType": "TIMESTAMP"
                                                  }
                                             ]
                                         },
                                         "MeasureNameColumn": "measure_name"
                                     }
                                 },
                                 DataSourceConfiguration={
                                     "DataSourceS3Configuration": {
                                         "BucketName": 'bucket_name',
                                         "ObjectKeyPrefix": output
                                     },
                                     "DataFormat": "CSV"
                                 },
                                 ReportConfiguration={
                                     "ReportS3Configuration": {
                                         "BucketName": 'bucket_name',
                                         "ObjectKeyPrefix": 'sub_folder',
                                         "EncryptionOption": 'SSE_S3'
                                     }
                                 }
                                 )
    print(response)
            
    ## Capture Transformation End time
    from datetime import datetime
    var_time = datetime.now()
    cur_end = var_time.strftime("%d/%m/%Y %H:%M:%S")
    print("Process ended at", cur_end)
    
    
    # TODO implement
    return {
        'statusCode': 200,
        # 'body': json.dumps('Hello from Lambda!')
    }

 

Interacting with the AWS TimeStream from Pycharm:

First, integrate the AWS environment with Pycharm to run queries on the TimeStream table. Get the AWS Access_Key_ID and Secret_Access_Key, and if it’s an organization account, get the Session_Token as well.

Download the AWS Tool Kit in PyCharm, install AWS Configure locally, and then open Terminal on a Mac or open cmd on a Windows.

#Use the Following Command to configure the AWS Credentials in it.

#In Mac:
naveenchandarb@cmd ~ % vi ~/.aws/credentials
[default]
Access_Key_ID = ************************
Secret_Access_Key = ************************
Session_Token = ************************

#In Windows:
naveenchandarb@cmd ~ % aws configure
Access_Key_ID = ************************
Secret_Access_Key = ************************
Session_Token = ************************
region = ************************

 

After configuring AWS credentials on the local machine, go to PyCharm, create a Python file, and use the below script to interact with the AWS TimeStream.

import boto3

timestream_client = boto3.client('timestream-query')
paginator = self.timestream_client.get_paginator('query')

def execute_query(query):
    response = paginator.paginate(QueryString=query)
    # print(response)
    column_names = [column['Name'] for column in response['ColumnInfo']]
    query_id = response['QueryId']
    next_token = None
    rows = []

    while True:
        if 'Rows' in response:
            rows.extend(response['Rows'])

        if 'NextToken' in response:
            next_token = response['NextToken']
            response = timestream_client.query(QueryString=query_id, NextToken=next_token)
        else:
            break

    return rows, column_names


def parse_rows(rows):
    parsed_rows = []

    for row in rows:
        parsed_row = []
        for data in row['Data']:
            parsed_row.append(data.get('ScalarValue'))
        parsed_rows.append(parsed_row)
    return parsed_rows


def run_query(query):
    rows, column_names = execute_query(query)
    parsed_rows = parse_rows(rows)
    return parsed_rows, column_names


query = """
unload(
SELECT Dimension_Name,
  AVG(measure_value) AS avg_value,
  bin(time, 15m) as avg_time
FROM "timestream_db"."timestream_table"
WHERE
  time >= '2018-01-01 00:00:00' AND time <= '2018-01-10 00:00:00'
  and  
  measure_name = 'Medium'
group by
Dimension_Name, bin(time, 15m))
to S3://bucket_name/sub_folders/ with (format = 'CSV', compression = 'NONE')
"""

result, column_names = run_query(query)

table = [column_names] + result

# Print the result
for row in table:
    print(row)

 

I hope this blog will help in performing TimeStream operations using Lambda without any complex coding to achieve remote query executions and the ingestion of both real-time and batch-load data using the Python SDK.

 

Naveen B

+ posts
Privacy Preferences
When you visit our website, it may store information through your browser from specific services, usually in form of cookies. Here you can change your privacy preferences. Please note that blocking some types of cookies may impact your experience on our website and the services we offer.