lambtda python add athena partition when file upload to s3

Automating bucketing of streaming data using Amazon Athena and AWS Lambda

In today's world, data plays a vital role in helping businesses empathise and improve their processes and services to reduce price. Yous can use several tools to proceeds insights from your data, such as Amazon Kinesis Information Analytics or open-source frameworks like Structured Streaming and Apache Flink to analyze the data in real fourth dimension. Alternatively, you tin batch analyze the data by ingesting information technology into a centralized storage known equally a data lake. Information lakes let y'all to import any amount of information that tin can come in real time or batch. With Amazon Simple Storage Service (Amazon S3), you can cost-effectively build and calibration a information lake of whatsoever size in a secure environment where data is protected by 99.999999999% (xi 9s) of durability.

After the data lands in your data lake, you can start processing this data using any Big Data processing tool of your choice. Amazon Athena is a fully managed interactive query service that enables you to clarify data stored in an Amazon S3-based information lake using standard SQL. You lot can also integrate Athena with Amazon QuickSight for like shooting fish in a barrel visualization of the data.

When working with Athena, yous can employ a few best practices to reduce cost and improve performance. Converting to columnar formats, sectionalisation, and bucketing your data are some of the all-time practices outlined in Peak ten Performance Tuning Tips for Amazon Athena. Bucketing is a technique that groups information based on specific columns together inside a single sectionalization. These columns are known equally bucket keys. By grouping related data together into a single saucepan (a file inside a partition), you lot significantly reduce the amount of data scanned by Athena, thus improving query functioning and reducing toll. For instance, imagine collecting and storing clickstream data. If you lot oft filter or aggregate by user ID, then within a single partition it'southward improve to store all rows for the same user together. If user information isn't stored together, then Athena has to scan multiple files to call up the user's records. This leads to more files being scanned, and therefore, an increase in query runtime and cost.

Like partitioning, columns that are frequently used to filter the data are good candidates for bucketing. Still, unlike partitioning, with bucketing it'southward better to apply columns with loftier cardinality equally a bucketing key. For case, Year and Calendar month columns are good candidates for segmentation keys, whereas userID and sensorID are good examples of bucket keys. Past doing this, y'all make sure that all buckets have a similar number of rows. For more information, see Bucketing vs Segmentation.

For real-time information (such as information coming from sensors or clickstream data), streaming tools like Amazon Kinesis Data Firehose can convert the data to columnar formats and sectionalisation it while writing to Amazon S3. With Kafka, you lot tin can exercise the same thing with connectors. But what most bucketing? This post shows how to continuously saucepan streaming data using AWS Lambda and Athena.

Overview of solution

The post-obit diagram shows the high-level architecture of the solution.

The compages includes the following steps:

  1. We apply the Amazon Kinesis Data Generator (KDG) to simulate streaming data. Data is then written into Kinesis Data Firehose; a fully managed service that enables you to load streaming data to an Amazon S3-based data lake.
  2. Kinesis Data Firehose partitions the data by hour and writes new JSON files into the current partition in a /raw Each new partition looks like /raw/dt= <YYYY-MM-dd-HH> . Every hr, a new sectionalization is created.
  3. Two Lambda functions are triggered on an hourly basis based on Amazon CloudWatch Events.
    • Function 1 (LoadPartition) runs every hour to load new /raw partitions to Athena SourceTable, which points to the /raw prefix.
    • Function 2 (Bucketing) runs the Athena CREATE TABLE AS SELECT (CTAS) query.
  4. The CTAS query copies the previous 60 minutes'south data from /raw to /curated and buckets the information while doing so. It loads the new data as a new division to TargetTable, which points to the /curated prefix.

Overview of walkthrough

In this postal service, we cover the following high-level steps:

  1. Install and configure the KDG.
  2. Create a Kinesis Information Firehose commitment stream.
  3. Create the database and tables in Athena.
  4. Create the Lambda functions and schedule them.
  5. Test the solution.
  6. Create view that the combines data from both tables.
  7. Make clean up.

Installing and configuring the KDG

Get-go, we need to install and configure the KDG in our AWS account. To do this, we use the following AWS CloudFormation template.

For more than information about installing the KDG, see the KDG Guide in GitHub.

To configure the KDG, consummate the post-obit steps:

  1. On the AWS CloudFormation console, locate the stack you simply created.
  2. On the Outputs tab, record the value for KinesisDataGeneratorUrl.
  3. Log in to the KDG main folio using the credentials created when you deployed the CloudFormation template.
  4. In the Record template section, enter the following template. Each record has three fields: sensorID , currentTemperature, and status.
  5. Cull Exam template.

The result should look like the post-obit screenshot.

We don't starting time sending data at present; we practise this afterwards creating all other resources.

Creating a Kinesis Data Firehose delivery stream

Next, we create the Kinesis Data Firehose commitment stream that is used to load the data to the S3 bucket.

  1. On the Amazon Kinesis console, cull Kinesis Information Firehose.
  2. Choose Create commitment stream.
  3. For Delivery stream name, enter a name, such equally AutoBucketingKDF.
  4. For Source, select Direct PUT or other sources.
  5. Exit all other settings at their default and choose Next.
  6. On Process Records page, leave everything at its default and choose Next.
  7. Choose Amazon S3 as the destination and cull your S3 bucket from the drop-downwardly menu (or create a new i). For this postal service, I already accept a bucket created.
  8. For S3 Prefix, enter the following prefix:

We employ custom prefixes to tell Kinesis Data Firehose to create a new partition every 60 minutes. Each partition looks like this: dt= YYYY-MM-dd-HH. This partition-naming convention conforms to the Hive partition-naming convention, <PartitionKey>=<PartitionKey> . In this case, <PartitionKey> is dt and <PartitionValue> is YYYY-MM-dd-HH . By doing this, we implement a apartment sectionalization model instead of hierarchical (twelvemonth=YYYY/month=MM/day=dd/hour=HH) partitions. This model tin be much simpler for cease-users to work with, and you tin can employ a single column (dt) to filter the information. For more information on flat vs. hierarchal partitions, run into Data Lake Storage Foundation on GitHub.

  1. For S3 error prefix, enter the post-obit code:
  2. On the Settings folio, leave everything at its default.
  3. Choose Create delivery stream.

Creating an Athena database and tables

In this solution, the Athena database has 2 tables: SourceTable and TargetTable . Both tables take identical schemas and volition have the aforementioned information eventually. Even so, each table points to a different S3 location. Moreover, because information is stored in different formats, Athena uses a dissimilar SerDe for each tabular array to parse the information. SourceTable uses JSON SerDe and TargetTable uses Parquet SerDe. One other difference is that SourceTable's data isn't bucketed, whereas TargetTable's data is bucketed.

In this stride, we create both tables and the database that groups them.

  1. On the Athena console, create a new database by running the post-obit argument:
  2. Cull the database that was created and run the following query to create SourceTable . Supplant <s3_bucket_name> with the saucepan name you used when creating the Kinesis Data Firehose delivery stream.
  3. Run the post-obit CTAS statement to create TargetTable:

SourceTable doesn't accept any data yet. However, the preceding query creates the tabular array definition in the Information Catalog. We configured this information to be bucketed by sensorID (bucketing key) with a bucket count of iii. Ideally, the number of buckets should be so that the files are of optimal size.

Creating Lambda functions

The solution has two Lambda functions: LoadPartiton and Bucketing . We use an AWS Serverless Awarding Model (AWS SAM) template to create, deploy, and schedule both functions.

Follow the instructions in the GitHub repo to deploy the template. When deploying the template, information technology asks yous for some parameters. Yous tin employ the default parameters, but you have to modify S3BucketName and AthenaResultLocation. For more than data, run into Parameter Details in the GitHub repo.

LoadPartition function

The LoadPartiton role is scheduled to run the outset minute of every hr. Every time Kinesis Data Firehose creates a new segmentation in the /raw folder, this role loads the new partition to the SourceTable. This is crucial because the 2nd function (Bucketing) reads this partition the following hour to re-create the data to /curated.

Bucketing function

The Bucketing office is scheduled to run the first minute of every hour. It copies the last hr'southward data from SourceTable to TargetTable . It does and then past creating a tempTable using a CTAS query. This tempTable points to the new date-hour folder nether /curated; this binder is so added equally a single partition to TargetTable.

To implement this, the role runs three queries sequentially. The queries use ii parameters:

  • <s3_bucket_name> – Defined past an AWS SAM parameter and should be the same saucepan used throughout this solution
  • <last_hour_partition> – Is calculated by the office depending on which hour it's running

The function first creates TempTable as the consequence of a SELECT argument from SourceTable. It stores the results in a new binder under /curated. The results are bucketed and stored in Parquet format. Encounter the following code:

We create a new subfolder in /curated, which is new partition for TargetTable. And so, subsequently the TempTable creation is complete, we load the new partition to TargetTable:

Finally, we delete tempTable from the Data Catalog:

Testing the solution

Now that we accept created all resources, it'due south time to examination the solution. We showtime past generating data from the KDG and waiting for an hour to start querying information in TargetTable (the bucketed table).

  1. Log in to the KDG. Yous should observe the template you created earlier. For the configuration, choose the following:
    1. The Region used.
    2. For the delivery stream, choose the Kinesis Data Firehose you created before.
    3. For records/sec, enter 3000.
  2. Choose Send data.

The KDG starts sending simulated data to Kinesis Data Firehose. Afterwards 1 minute, a new partition should be created in Amazon S3.

The Lambda function that loads the segmentation to SourceTable runs on the first minute of the hour. If you lot started sending data later the first infinitesimal, this sectionalization is missed considering the adjacent run loads the next hr's partition, not this one. To mitigate this, run MSCK REPAIR TABLE SourceTable only for the first hour.

  1. To benchmark the performance betwixt both tables, wait for an hour so that the data is available for querying in TargetTable.
  2. When the data is available, choose i sensorID and run the following query on SourceTable and TargetTable.

The following screenshot shows the query results for SourceTable. Information technology shows the runtime in seconds and amount of data scanned.

The following screenshot shows the query results for TargetTable.

If you look at these results, you lot don't see a huge difference in runtime for this specific query and dataset; for other datasets, this deviation should be more pregnant. However, from a data scanning perspective, later on bucketing the information, nosotros reduced the data scanned by approximately 98%. Therefore, for this specific utilize case, bucketing the data lead to a 98% reduction in Athena costs because you're charged based on the amount of data scanned by each query.

Querying the current hour'south data

Data for the current hour isn't available immediately in TargetTable. It's available for querying after the first minute of the post-obit hr. To query this information immediately, nosotros have to create a view that UNIONS the previous 60 minutes'south information from TargetTable with the electric current hour'south information from SourceTable. If data is required for analysis after an hr of its arrival, then you lot don't need to create this view.

To create this view, run the following query in Athena:

Cleaning up

Delete the resource you created if yous no longer need them.

  1. Delete the Kinesis Data Firehose delivery stream.
  2. In Athena, run the following statements
    1. Drop DATABASE mydatabase
    2. DROP TABLE SourceTable
    3. Driblet Table TargetTable
  3. Delete the AWS SAM template to delete the Lambda functions.
  4. Delete the CloudFormation stack for the KDG. For more data, run across Deleting a stack on the AWS CloudFormation panel.

Decision

Bucketing is a powerful technique and tin significantly improve operation and reduce Athena costs. In this post, we saw how to continuously bucket streaming data using Lambda and Athena. We used a false dataset generated by Kinesis Data Generator. The same solution can apply to any product data, with the following changes:

  • DDL statements
  • Functions used can work with data that is partitioned by hour with the partition key 'dt' and partition value <YYYY-MM-dd-HH>. If your data is partitioned in a different mode, edit the Lambda functions appropriately.
  • Frequency of Lambda triggers.

About the Writer

Ahmed Zamzam is a Solutions Architect with Amazon Web Services. He supports SMB customers in the U.k. in their digital transformation and their cloud journeying to AWS, and specializes in Data Analytics. Outside of piece of work, he loves traveling, hiking, and cycling.

taylorlicep1966.blogspot.com

Source: https://aws.amazon.com/blogs/big-data/automating-bucketing-of-streaming-data-using-amazon-athena-and-aws-lambda/

0 Response to "lambtda python add athena partition when file upload to s3"

Post a Comment

Iklan Atas Artikel

Iklan Tengah Artikel 1

Iklan Tengah Artikel 2

Iklan Bawah Artikel