creative internet computer display

Effective data lakes using AWS Lake Formation, Part 2: Creating a governed table for streaming data sources

Original Post>

We announced the preview of AWS Lake Formation transactions, row-level security, and acceleration at AWS re:Invent 2020 . In Part 1 of this series , we explained how to set up a governed table and add objects to it. In this post, we expand on this example, and demonstrate how to ingest streaming data into governed tables using Lake Formation transactions.

In typical streaming use cases, new data is continuously ingested into a data lake. This creates many small files that may impact query performance. In addition, a common requirement is to isolate queries from updates without requiring downtime or having to wait a long time for the updates to settle. Finally, some use cases such as periodic reports and iterative machine learning training require the ability to go back in time to a specific timestamp and query against it.

Lake Formation transactions provide an easy mechanism to add streaming datasets to your data lake while isolating any in-process queries. This ensures that queries can run concurrently without latency. Lake Formation acceleration automatically compacts small files to improve query performance.

In this post, we demonstrate how to ingest streaming data into your data lake using Lake Formation transactions in real time.

Advertisements

Architecture overview

AWS CloudTrail is an AWS service that helps you enable governance, compliance, and operational and risk auditing of your AWS account. Actions taken by a user, role, or an AWS service are recorded as events in CloudTrail. We relay these CloudTrail events into Amazon Kinesis Data Streams, process this data stream using an AWS Glue streaming job, and store the data in Amazon Simple Storage Service (Amazon S3) using AWS Lake Formation transactions.

The following diagram shows the solution architecture.

Setting up resources with AWS CloudFormation

This post provides an AWS CloudFormation template to get you started quickly. You can review and customize it to suit your needs. Some of the resources deployed by this stack incur costs when in use. If you prefer to set up these resources manually using the AWS Management Console, see the instructions in the appendix at the end of this post.

The CloudFormation template generates the following resources:

When following the steps in this section, use the Region us-east-1 because as of this writing, this Lake Formation preview feature is available only in us-east-1.

To create these resources, complete the following steps:

  1. Sign in to the CloudFormation console in us-east-1 Region.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatalakeAdminUserNameand DatalakeAdminUserPassword, enter your IAM user name and password for data lake admin user.
  5. For DataAnalystUserNameand DataAnalystUserPassword, enter your IAM user name and password for data analyst user.
  6. For DataLakeBucketName, enter the name of your data lake bucket.
  7. For DatabaseName and TableName, leave as the default.
  8. Choose Next.
  9. On the next page, choose Next.
  10. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  11. Choose Create.

Stack creation should take approximately 3 minutes.

Setting up a governed table

Now you can create and configure your governed table in Lake Formation.

Creating a governed table

To create your governed table, complete the following steps:

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin2 user.
  2. Choose Tables.
  3. Choose Create table.
  4. For Name, enter cloudtrail_governed.
  5. For Database, enter lakeformation_tutorial_cloudtrail.
  6. Select Enable governed data access and management.
  7. Select Enable row based permissions.
  8. For Data is located in, choose Specified path in my account.
  9. Enter the path s3://<datalake-bucket>/data/cloudtrail_governed/.
  10. For Classification, choose PARQUET.
  11. Choose Upload Schema.
  12. Enter the following JSON array into the text box:
[
    {
        "Name": "eventversion",
        "Type": "string"
    },
    {
        "Name": "eventtime",
        "Type": "string"
    },
    {
        "Name": "awsregion",
        "Type": "string"
    },
    {
        "Name": "sourceipaddress",
        "Type": "string"
    },
    {
        "Name": "useragent",
        "Type": "string"
    },
    {
        "Name": "errorcode",
        "Type": "string"
    },
    {
        "Name": "errormessage",
        "Type": "string"
    },
    {
        "Name": "requestid",
        "Type": "string"
    },
    {
        "Name": "eventid",
        "Type": "string"
    },
    {
        "Name": "eventtype",
        "Type": "string"
    },
    {
        "Name": "apiversion",
        "Type": "string"
    },
    {
        "Name": "readonly",
        "Type": "string"
    },
    {
        "Name": "recipientaccountid",
        "Type": "string"
    },
    {
        "Name": "sharedeventid",
        "Type": "string"
    },
    {
        "Name": "vpcendpointid",
        "Type": "string"
    }
]
  1. Choose Upload.

We now add two partition columns: eventsource and eventname.

  1. Choose Add column.
  2. For Column name, enter eventsource.
  3. For Data type, choose String.
  4. Select Partition Key.
  5. Choose Add.
  6. Choose Add column.
  7. For Column name, enter eventname.
  8. For Data type, choose String.
  9. Select Partition Key.
  10. Choose Add.
  11. Choose Submit.
Advertisements

Configuring Lake Formation permissions

You need to grant Lake Formation permissions for your governed table. Complete the following steps:

Table-level permissions

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin2 user.
  2. Under Permissions, choose Data permissions.
  3. Under Data permission, choose Grant.
  4. For Database, choose lakeformation_tutorial_cloudtrail.
  5. For Table, choose cloudtrail_governed.
  6. For IAM users and roles, choose the roles GlueETLServiceRole-<CloudFormation stack name> and LFRegisterLocationServiceRole-<CloudFormation stack name>,and the user DatalakeAdmin2.
  7. Select Table permissions.
  8. Under Table permissions, select AlterInsertDropDeleteSelect, and Describe.
  9. Choose Grant.
  10. Under Data permission, choose Grant.
  11. For Database, choose lakeformation_tutorial_cloudtrail.
  12. For Table, choose cloudtrail_governed.
  13. For IAM users and roles, choose the user DataAnalyst2.
  14. Under Table permissions, select Select and Describe.
  15. Choose Grant.

Row-level permissions
Previously, we enabled row-level permissions on the table cloudtrail_goverend. You need to grant row-level permissions to IAM roles and users so that your Glue job and Lake Formation compaction can process the table and your IAM users can read the table in Athena queries.

  1. Under Permissions, choose Data permissions.
  2. Under Data permission, choose Grant.
  3. For Database, choose lakeformation_tutorial_cloudtrail.
  4. For Table, choose cloudtrail_governed.
  5. For IAM users and roles, choose the roles GlueETLServiceRole-<CloudFormation stack name> and LFRegisterLocationServiceRole-<CloudFormation stack name>, the users DatalakeAdmin2 and DataAnalyst2.
  6. Select Row-based permissions.
  7. For Filter name, enter allowAll.
  8. For Choose filter type, select Allow access to all rows.
  9. Choose Grant.

Data location permissions

  1. Under Permissions, choose Data locations.
  2. Under Data locations, choose Grant.
  3. For IAM users and roles, choose the role GlueETLServiceRole-<CloudFormation stack name>.
  4. For Storage location, enter s3://<datalake-bucket>/.
  5. Choose Grant.

Adding table objects into the governed table

If you want to copy data from your existing tables (such as JDBC or Amazon S3) into a governed table, it’s a good idea to run a standard AWS Glue job to load the data. On the other hand, if you want to stream data from your streaming sources (Kinesis or Kafka) into a governed table, it’s a good idea to run an AWS Glue streaming job to load the data.

Even if you place your data in the table location of the governed table, the data isn’t recognized yet. To make the governed table aware of the data, you need add the files containing your data to the governed table using Lake Formation transactions.

To add S3 objects to a governed table, call the UpdateTableObjects API for the objects. You can call it using the AWS Command Line Interface (AWS CLI) and SDK, and also the AWS Glue ETL library (the API is called implicitly in the library). For this post, we use an AWS Glue Streaming job to ingest CloudTrail data from Kinesis Data Streams into a governed table using the AWS Glue ETL library to put objects into Amazon S3 and to call the UpdateTableObjects API for the S3 objects implicitly. Let’s run the AWS Glue job cloudtrail_ingestion-<CloudFormation stack name>, which is automatically created in the CloudFormation template.

  1. On the AWS Glue console, choose Jobs.
  2. Select the job named cloudtrail_ingestion-<CloudFormation stack name>.
  3. On the Actions menu, choose Run job.

This job reads from the data stream, and writes into the governed table cloudtrail_governed at 100-second intervals. After a few minutes, you can see new files being written into the data lake bucket.

Querying a governed table using Amazon Athena

Now everything is ready! Let’s start querying the governed table using Amazon Athena.

If it’s your first-time running queries on Athena, you need to configure the query result location. For more information, see Specifying a Query Result Location.

To utilize Lake Formation preview features, you need to create a special workgroup named AmazonAthenaLakeFormationPreview, and join the workgroup. For more information, see Managing Workgroups.

Running a simple query

Sign in to the Athena console in us-east-1 Region using the DataAnalyst2 user. First, let’s preview the table by querying 10 records stored in the governed table:

SELECT * 
FROM lakeformation.lakeformation_tutorial_cloudtrail.cloudtrail_governed
LIMIT 10;

The following screenshot shows the query result.

Running an analytic query

Next, let’s run an analytic query with aggregation to simulate real-world use cases:

SELECT count (*) as TotalEvents, eventsource, eventname
FROM lakeformation.lakeformation_tutorial_cloudtrail.cloudtrail_governed
GROUP BY eventsource, eventname
ORDER BY TotalEvents desc;

The following screenshot shows the results. This query returns the total API calls per eventsource and eventname.

Running an analytic query with time travel

With governed tables, you can go back in time to a specifc timestamp and query against it. You can run Athena queries against governed tables that include a timestamp to target the state of the data at a particular date and time.

To submit a time travel query in Athena, add a WHERE clause that sets the column __asOfDate to the epoch time (long integer) representation of the required date and time.

Retrieve epoch time which is 5 minutes ago from now, you can run the following commands.

The command for Linux (GNU date command):

$ echo $(($(date -u -d '5 minutes ago' +%s%N)/1000000))
1613464200000

The command for OSX (BSD date command):

$ echo $(($(date -u -v -5M +'%s * 1000 + %-N / 1000000')))
1613464200000

Let’s run the time travel query: (replace <epoch-milliseconds> with the timestamp you got).

SELECT count (*) as TotalEvents, eventsource, eventname
FROM lakeformation.lakeformation_tutorial_cloudtrail.cloudtrail_governed
WHERE __asOfDate = <epoch-milliseconds>
GROUP BY eventsource, eventname
ORDER BY TotalEvents desc;

The following screenshot shows the query result. The number of TotalEvents is less than that in the previous result because this result is pointing to the older timestamp.

Cleaning up

Bestseller No. 1
SAMSUNG Galaxy A54 5G A Series Cell Phone, Unlocked Android Smartphone, 128GB, 6.4” Fluid Display Screen, Pro Grade Camera, Long Battery Life, Refined Design, US Version, 2023, Awesome Black
  • CRISP DETAIL, CLEAR DISPLAY: Enjoy binge-watching...
  • PRO SHOTS WITH EASE: Brilliant sunrises, awesome...
  • CHARGE UP AND CHARGE ON: Always be ready for an...
  • POWERFUL 5G PERFORMANCE: Do what you love most —...
  • NEW LOOK, ADDED DURABILITY: Galaxy A54 5G is...
Bestseller No. 2
OnePlus 12,16GB RAM+512GB,Dual-SIM,Unlocked Android Smartphone,Supports 50W Wireless Charging,Latest Mobile Processor,Advanced Hasselblad Camera,5400 mAh Battery,2024,Flowy Emerald
  • Free 6 months of Google One and 3 months of...
  • Pure Performance: The OnePlus 12 is powered by the...
  • Brilliant Display: The OnePlus 12 has a stunning...
  • Powered by Trinity Engine: The OnePlus 12's...
  • Powerful, Versatile Camera: Explore the new 4th...

Last update on 2024-04-05 / Affiliate links / Images from Amazon Product Advertising API

Now to the final step, cleaning up the resources.

  1. Empty the Amazon S3 data lake bucket, then delete the bucket.
  2. Delete the CloudFormation stack. The governed table you created is also automatically deleted in stack deletion.
  3. Delete the Athena workgroup AmazonAthenaLakeFormationPreview.

Conclusion

In this post, we explained how to create a Lake Formation governed table with CloudTrail data coming from EventBridge via Kinesis Data Streams. In addition, we explained how to query against governed tables and how to run time travel queries for governed tables. With Lake Formation governed tables, you can achieve transactions, row-level security, and query acceleration. In part 3 of the series we will show you how to use ACID transactions on governed tables to perform multiple operations atomically and enforce isolation between operations.

Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, sign up for the preview. You need to be approved for the preview to gain access to these features.


Advertisements

Appendix 1: Setting up resources via the console

Configuring IAM roles and IAM users

First, you need to set up two IAM roles: one for AWS Glue ETL jobs, another for the Lake Formation data lake location.

IAM policies

To create the IAM policies for your roles complete the following steps:

  1. On the IAM console, create a new policy for Amazon S3.
  2. Save the IAM policy as S3DataLakePolicy as follows (replace <datalake-bucket> with your bucket name):{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:::<datalake-bucket>/*" ] }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::<datalake-bucket>/*" ] } ] }
  3. Create the new IAM policy named LFTransactionPolicywith the following statements::{ "Veion": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lakeformation:BeginTransaction", "lakeformation:CommitTransaction", "lakeformation:AbortTransaction", "lakeformation:ExtendTransaction", "lakeformation:GetTransaction", "lakeformation:GetTransactions", "lakeformation:UpdateTableObjects" ], "Resource": "*" } ] }  
  4. Create a new IAM policy named LFLocationPolicy with the following statements:{ "Version": "2012-10-17", "Statement": [ { "Sid": "LFPreview1", "Effect": "Allow", "Action": "execute-api:Invoke", "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus" }, { "Sid": "LFPreview2", "Effect": "Allow", "Action": [ "lakeformation:BeginTransaction", "lakeformation:CommitTransaction", "lakeformation:AbortTransaction", "lakeformation:GetTableObjects", "lakeformation:UpdateTableObjects" ], "Resource": "*" } ] }
  5. Create a new IAM policy named LFQueryPolicy with the following statements:{ "Version": "2012-10-17", "Statement": [ { "Sid": "LFPreview1", "Effect": "Allow", "Action": "execute-api:Invoke", "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus" }, { "Sid": "LFPreview2", "Effect": "Allow", "Action": [ "lakeformation:BeginTransaction", "lakeformation:CommitTransaction", "lakeformation:AbortTransaction", "lakeformation:ExtendTransaction", "lakeformation:PlanQuery", "lakeformation:GetTableObjects", "lakeformation:GetQueryState", "lakeformation:GetWorkUnits", "lakeformation:Execute" ], "Resource": "*" } ] }

IAM role for AWS Glue ETL job

Create a new IAM role to run AWS Glue ETL jobs.

  1. On the IAM console, create the role GlueETLServiceRole with the following AWS Glue trust relationship for AWS Glue:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "glue.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
  2. Attach the following AWS managed policies:
    1. AWSGlueServiceRole
    2. AWSLakeFormationDataAdmin
    3. AmazonKinesisReadOnlyAccess
  3. Attach the following customer managed policies:
    1. S3DataLakePolicy
    2. LFTransactionPolicy

IAM role for AWS Lake Formation data lake location

To create your IAM role for Lake Formation, complete the following steps:

  1. Create a new IAM Role called LFRegisterLocationServiceRole with the following Lake Formation trust relationship:{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "lakeformation.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }
  2. Attach the following customer managed policies:
    1. S3DataLakePolicy
    2. LFLocationPolicy

This role is used to register locations with Lake Formation which then performs credential vending for Athena at query time.

IAM users

To create your IAM users, complete the following steps:

  1. Create an IAM user named DatalakeAdmin2.
  2. Attach the following AWS managed policies:
    1. AWSLakeFormationDataAdmin
    2. AmazonAthenaFullAccess
    3. IAMReadOnlyAccess
  3. Attach the customer managed policy LFQueryPolicy.
  4. Create an IAM user named dataAnalyst2 that can use Athena to query data.
  5. Attach the AWS managed policy AmazonAthenaFullAccess.
  6. Attach the customer managed policy LFQueryPolicy.

Configuring Kinesis Data Streams

To create your data stream, complete the following steps:

  1. On the Kinesis Data Streams console, choose Create data stream.
  2. For Data stream name, enter cloudtrail.
  3. For Number of open shards, enter 2.
  4. Choose Create data stream.

Configuring EventBridge

For this post, we set up an EventBridge rule to capture CloudTrail API logs and relay them to Kinesis Data Streams.

  1. On the EventBridge console, choose Rules.
  2. Choose Create rule.
  3. For Name, enter cloudtrail-to-kinesis.
  4. Select Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following JSON (replace <account-id>with your AWS account ID):
{
  "detail-type": [
    "AWS API Call via CloudTrail"
  ],
  "account": [
    "<account-id>"
  ]
}
  1. For Target, choose Kinesis stream.
  2. For Stream, choose cloudtrail.
  3. For Configure input, choose Part of the matched event.
  4. Enter $.detail.
  5. Choose Create.

Creating a table for the data stream in the Glue Data Catalog

In this step, you create a table for your data stream in the AWS Glue Data Catalog.

  1. On the AWS Glue console, choose Tables.
  2. Choose Add tables.
  3. Select Add table manually.
  4. For Table name, enter cloudtrail_kinesis.
  5. For Database, enter lakeformation_tutorial_cloudtrail.
  6. Choose Next.
  7. For Select the type of source, choose Kinesis.
  8. For Stream name, enter cloudtrail.
  9. For Kinesis source URL, enter https://kinesis.us-east-1.amazonaws.com.
  10. Choose Next.
  11. For Classification, choose JSON.
  12. Choose Next.
  13. Choose Add column.
  14. For Column name, enter eventversion.
  15. For Column type, choose string.
  16. Choose Add.
  17. Repeat the step 13-16 for the following columns.
    1. Column nameuseridentityColumn typestructStructSchemaSTRUCT<type:STRING,principalid:STRING,arn:STRING,accountid:STRING,invokedby:STRING,accesskeyid:STRING,userName:STRING,sessioncontext:STRUCT<attributes:STRUCT<mfaauthenticated:STRING,creationdate:STRING>,sessionissuer:STRUCT<type:STRING,principalId:STRING,arn:STRING,accountId:STRING,userName:STRING>>>
    2. Column nameeventtimeColumn typestring
    3. Column nameeventsourceColumn typestring
    4. Column nameeventnameColumn typestring
    5. Column namesourceipaddressColumn typestring
    6. Column nameuseragentColumn typestring
    7. Column nameerrorcodeColumn typestring
    8. Column nameerrormessageColumn typestring
    9. Column namerequestparametersColumn typestring
    10. Column nameresponseelementsColumn typestring
    11. Column nameadditionaleventdataColumn typestring
    12. Column namerequestidColumn typestring
    13. Column nameeventidColumn typestring
    14. Column nameresourcesColumn typearrayArraySchemaARRAY<STRUCT<ARN:STRING,accountId:STRING,type:STRING>>
    15. Column nameeventtypeColumn typestring
    16. Column nameapiversionColumn typestring
    17. Column namereadonlyColumn typeboolean
    18. Column namerecipientaccountidColumn typestring
    19. Column nameserviceeventdetailsColumn typestring
    20. Column namesharedeventidColumn typestring
    21. Column namevpcendpointidColumn typestring
  18. Click Next button
  19. Click Finish button

Configuring Lake Formation

You can follow the following steps to configure Lake Formation permissions:

Data lake settings

  1. On the Lake Formation console, under Permissions, choose Admins and database creators.
  2. In the Data lake administrators section, choose Grant.
  3. For IAM users and roles, choose your IAM user DatalakeAdmin2.
  4. Choose Save.
  5. In the Database creatorssection, choose Grant.
  6. For IAM users and roles, choose the LFRegisterLocationServiceRole.
  7. Select Create Database.
  8. Choose Grant.

Data lake locations

  1. Under Register and ingest, choose Data lake locations.
  2. Choose Register location.
  3. For Amazon S3 path, enter s3://<datalake-bucket>/. This needs to be the same bucket you listed in LFLocationPolicy. Lake Formation uses this role to vend temporary Amazon S3 credentials to query services that need read/write access to the bucket and all prefixes under it.
  4. For IAM role, choose the LFRegisterLocationServiceRole.
  5. Choose Register location.
New
Fadnou I23 Ultra Unlocked Cell Phone,Built in Pen,Smartphone Battery 6800mAh 6.8" HD Screen Unlocked Phones,6+256GB Android13 with 128G Memory Card,Face ID/Fingerprint Lock/GPS (Purple)
  • 【Octa-Core CPU + 128GB Expandable TF Card】...
  • 【6.8 HD+ Android 13.0】 This is an Android Cell...
  • 【Dual SIM and Global Band 5G Phone】The machine...
  • 【6800mAh Long lasting battery】With the 6800mAh...
  • 【Business Services】The main additional...
New
Huness I15 Pro MAX Smartphone Unlocked Cell Phone,Battery 6800mAh 6.8 HD Screen Unlocked Phone,6+256GB Android 13 with 128GB Memory Card,Dual SIM/5G/Fingerprint Lock/Face ID (Black, 6+256)
  • 【Dimensity 9000 CPU + 128GB Expandable TF...
  • 【6.8 HD+ Android 13.0】 This is an Android Cell...
  • 【Dual SIM and Global Band 5G Phone】Dual SIM &...
  • 【6800mAh Long lasting battery】The I15 Pro MAX...
  • 【Business Services】The main additional...
New
Jopuzia U24 Ultra Unlocked Cell Phone, 5G Smartphone with S Pen, 8GB+256GB Full Netcom Unlocked Phone, 6800mAh Battery 6.8" FHD+ Display 120Hz 80MP Camera, GPS/Face ID/Dual SIM Phone (Rose Gold)
  • 🥇【6.8" HD Unlocked Android Phones】Please...
  • 💗【Octa-Core CPU+ 256GB Storage】U24 Ultra...
  • 💗【Support Global Band 5G Dual SIM】U24 Ultra...
  • 💗【80MP Professional Photography】The U24...
  • 💗【6800mAh Long Lasting Battery】With the...

Last update on 2024-04-05 / Affiliate links / Images from Amazon Product Advertising API

Data catalog settings

  1. Under Data catalog, choose Settings.
  2. Make sure that both check boxes for Use only IAM access control for new databases and Use only IAM access control for new tables in new databases are deselected.
  3. Under Data catalog, choose Databases.
  4. Choose Create database.
  5. Select Database.
  6. For Name, enter lakeformation_tutorial_cloudtrail.
  7. Choose Create database.

Database-level permissions

  1. Under Permissions, choose Data permissions.
  2. Under Data permission, choose Grant.
  3. For Database, choose lakeformation_tutorial_cloudtrail.
  4. For IAM users and roles, choose the role GlueETLServiceRole.
  5. Select Database permissions.
  6. Under Database permissions, select Create TableAlterDrop, and Describe..
  7. Choose Grant.

This grants permission on the role GlueETLServiceRole to create and alter tables in the database which it needs to create a governed table.

Table-level permissions

  1. Under Permissions, choose Data permissions.
  2. Under Data permission, choose Grant.
  3. For Database, choose lakeformation_tutorial_cloudtrail.
  4. For Table, choose cloudtrail_kinesis.
  5. For IAM users and roles, choose the role GlueETLServiceRole.
  6. Select Table permissions.
  7. Under Table permissions, select Select and Describe.
  8. Choose Grant.

Creating an AWS Glue streaming job

To create your AWS Glue streaming job, complete the following steps:

  1. On the AWS Glue console, choose Jobs.
  2. Choose Add job.
  3. For Name, enter cloudtrail_ingestion.
  4. For IAM role¸ choose GlueETLServiceRole.
  5. For Type, choose Spark Streaming.
  6. For Glue version, choose Spark 2.4, Python 3 with improved job startup times (Glue Version 2.0).
  7. For This job runs, choose A new script authored by you.
  8. For S3 path where the script is stored, enter s3://aws-glue-scripts-<account-id>-us-east-1/lakeformation_tutorial_cloudtrail.
  9. For Temporary directory, enter s3://aws-glue-temporary-<account-id>-us-east-1/lakeformation_tutorial_cloudtrail.
  10. Under Security configuration, script libraries, and job parameters (optional), for Job parameters, provide the following:
    1. –datalake_bucket_name:<datalake-bucket>
    2. –database_name: lakeformation_tutorial_cloudtrail
    3. –src_table_name: cloudtrail_kinesis
    4. –dst_table_name: cloudtrail_governed
  11. Choose Next.
  12. Choose Save job and edit script. (don’t choose any connections here)
  13. Enter the following code:
# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import sys
from awsglue.transforms import ApplyMapping
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
from awsglue.dynamicframe import DynamicFrameCollection
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame

"""
Job Parameters
----------
datalake_bucket_name : string
    S3 bucket name for storing a governed table
database_name : string
    A Lake Formation database which stores a governed table and the source Kinesis table
src_table_name : string
    A Glue table name for a Kinesis Data Stream
dst_table_name : string
    A Lake Formation governed table name
"""
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'datalake_bucket_name', 'database_name', 'src_table_name', 'dst_table_name'])
datalake_bucket_name = args["datalake_bucket_name"]
database_name = args["database_name"]
src_table_name = args["src_table_name"]
dst_table_name = args["dst_table_name"]

sc = SparkContext()
glue_context = GlueContext(sc)
spark = glue_context.spark_session
job = Job(glue_context)
job.init(args['JOB_NAME'], args)


    def transaction_write(context, dfc) -> DynamicFrameCollection:
    """Write DynamicFrame into a governed table using Lake Formation transactions

    It is called in a processBatch method per a batch, so it makes one transaction per a batch.
    """
    dynamic_frame = dfc.select(list(dfc.keys())[0])
    tx_id = context.begin_transaction(read_only=False)
    sink = context.getSink(
        connection_type="s3", 
        path=f"s3://{datalake_bucket_name}/data/{dst_table_name}/",
        enableUpdateCatalog=True, 
        updateBehavior="UPDATE_IN_DATABASE",
        partitionKeys=["eventsource", "eventname"],
        transactionId=tx_id
    )
    sink.setFormat("glueparquet")
    sink.setCatalogInfo(catalogDatabase=database_name, catalogTableName=dst_table_name)
    try:
        sink.writeFrame(dynamic_frame)
        context.commit_transaction(tx_id)
    except Exception:
        context.abort_transaction(tx_id)
        raise 
    return dfc


# Create Spark DataFrame from the source Kinesis table
data_frame_kinesis = glue_context.create_data_frame.from_catalog(
    database=database_name,
    table_name=src_table_name,
    transformation_ctx="data_frame_kinesis",
    additional_options={
        "startingPosition": "TRIM_HORIZON", 
        "inferSchema": "true"
    }
)

# Applying a mapping to drop useridentity and resources column because struct and array are not supported yet.
cloudtrail_mappings = [
    ("eventversion", "string", "eventversion", "string"), 
    ("eventtime", "string", "eventtime", "string"), 
    ("eventsource", "string", "eventsource", "string"), 
    ("eventname", "string", "eventname", "string"), 
    ("awsregion", "string", "awsregion", "string"), 
    ("sourceipaddress", "string", "sourceipaddress", "string"), 
    ("useragent", "string", "useragent", "string"), 
    ("errorcode", "string", "errorcode", "string"), 
    ("errormessage", "string", "errormessage", "string"), 
    ("requestid", "string", "requestid", "string"), 
    ("eventid", "string", "eventid", "string"), 
    ("eventtype", "string", "eventtype", "string"), 
    ("apiversion", "string", "apiversion", "string"), 
    ("readonly", "boolean", "readonly", "string"), 
    ("recipientaccountid", "string", "recipientaccountid", "string"), 
    ("sharedeventid", "string", "sharedeventid", "string"), 
    ("vpcendpointid", "string", "vpcendpointid", "string")
]


def processBatch(data_frame, batchId):
    """Process each batch triggered by Spark Structured Streaming.

    It is called per a batch in order to read DataFrame, apply the mapping, and call TransactionWrite method
    """
    if data_frame.count() > 0:
        dynamic_frame = DynamicFrame.fromDF(data_frame, glue_context, "from_data_frame")
        dynamic_frame_apply_mapping = ApplyMapping.apply(
            frame=dynamic_frame,
            mappings=cloudtrail_mappings,
            transformation_ctx="apply_mapping"
        )
        dynamic_frame_collection = transaction_write(
            glue_context,
            DynamicFrameCollection({"dynamic_frame": dynamic_frame_apply_mapping}, glue_context)
        )

# Read from the DataFrame coming via Kinesis, and run processBatch method for batches in every 100 seconds 
glue_context.forEachBatch(
    frame=data_frame_kinesis,
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds", 
        "checkpointLocation": f"{args['TempDir']}/checkpoint/{database_name}/{src_table_name}/"
    }
)
job.commit()
  1. Choose Save.
  2. Choose Run.

Appendix 2: Set up to have more CloudTrail data to simulate larger data lake

You can choose from two different approaches to generate more events in CloudTrail to simulate a larger data lake:

  • Enable an S3 data event in CloudTrail
  • Set up an EventBridge event bus with other accounts where there are lots of events

About the Author

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue & Lake Formation. His passion is implementing software artifacts including libraries, utilities, connectors, tools, docs, and samples that help customers to build their data lakes easily and efficiently. Outside of work, he enjoys learning new technologies, watching anime, and playing with his children.