is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. Today, tens of thousands of
customers—from Fortune 500 companies, startups, and everything in between—use
Redshift to run mission-critical business intelligence (BI) dashboards, analyze real-time streaming data, and run predictive analytics. With the constant increase in generated data, Amazon Redshift customers continue to achieve successes in delivering better service to their end-users, improving their products, and running an efficient and effective business.
In this post, we discuss a customer who is currently using
to store analytics data. The customer needs to offer this data to clients who are using Amazon Redshift via AWS Data Exchange, the world’s most comprehensive service for third-party datasets. We explain in detail how to implement a fully integrated process that will automatically ingest data from Snowflake into Amazon Redshift and offer it to clients via AWS Data Exchange.
Overview of the solution
The solution consists of four high-level steps:
- Configure Snowflake to push the changed data for identified tables into an Amazon Simple Storage Service (Amazon S3) bucket.
- Use a custom-built Redshift Auto Loader to load this Amazon S3 landed data to Amazon Redshift.
- Merge the data from the change data capture (CDC) S3 staging tables to Amazon Redshift tables.
- Use Amazon Redshift data sharing to license the data to customers via AWS Data Exchange as a public or private offering.
The following diagram illustrates this workflow.
Prerequisites
To get started, you need the following prerequisites:
- A Snowflake account in the same Region as your Amazon Redshift cluster.
- An S3 bucket. Refer to Create your first S3 bucket for more details.
- An Amazon Redshift cluster with encryption enabled and an AWS Identity and Access Management (IAM) role with permission to the S3 bucket. See Create a sample Amazon Redshift cluster and Create an IAM role for Amazon Redshift for more details.
- A database schema from Snowflake to Amazon Redshift that is migrated using the AWS Schema Conversion Tool (AWS SCT). For more information, refer to Accelerate Snowflake to Amazon Redshift migration using AWS Schema Conversion Tool.
- An IAM role and external Amazon S3 stage for Snowflake access to the S3 bucket you created earlier. For instructions, refer to Configuring Secure Access to Amazon S3. Name this external stage unload_to_s3, pointing to the s3-redshift-loader-source folder of the target S3 bucket. It will be referenced in COPY commands later in this post for offloading the data to Amazon S3. Once created, you should see an external stage created as shown in the following screenshot.
- You must be a registered provider on AWS Data Exchange. For more information, see Providing data products on AWS Data Exchange.
Configure Snowflake to track the changed data and unload it to Amazon S3
In Snowflake, identify the tables that you need to replicate to Amazon Redshift. For the purpose of this demo, we use the data in the TPCH_SF1
schema’s Customer
, LineItem
, and Orders
tables of the SNOWFLAKE_SAMPLE_DATA
database, which comes out of the box with your Snowflake account.
- Make sure that the Snowflake external stage name
unload_to_s3
created in the prerequisites is pointing to the S3 prefixs3-redshift-loader-source
created in the previous step. - Create a new schema
BLOG_DEMO
in theDEMO_DB
database:CREATE SCHEMA demo_db.blog_demo;
- Duplicate the
Customer
,LineItem
, andOrders
tables in theTPCH_SF1
schema to theBLOG_DEMO
schema:CREATE TABLE CUSTOMER AS SELECT * FROM snowflake_sample_data.tpch_sf1.CUSTOMER; CREATE TABLE ORDERS AS SELECT * FROM snowflake_sample_data.tpch_sf1.ORDERS; CREATE TABLE LINEITEM AS SELECT * FROM snowflake_sample_data.tpch_sf1.LINEITEM;
- Verify that the tables have been duplicated successfully:
SELECT table_catalog, table_schema, table_name, row_count, bytes FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = 'BLOG_DEMO' ORDER BY ROW_COUNT;
- Create table streams to track data manipulation language (DML) changes made to the tables, including inserts, updates, and deletes:
CREATE OR REPLACE STREAM CUSTOMER_CHECK ON TABLE CUSTOMER; CREATE OR REPLACE STREAM ORDERS_CHECK ON TABLE ORDERS; CREATE OR REPLACE STREAM LINEITEM_CHECK ON TABLE LINEITEM;
- Perform DML changes to the tables (for this post, we run UPDATE on all tables and MERGE on the
customer
table):UPDATE customer SET c_comment = 'Sample comment for blog demo' WHERE c_custkey between 0 and 10; UPDATE orders SET o_comment = 'Sample comment for blog demo' WHERE o_orderkey between 1800001 and 1800010; UPDATE lineitem SET l_comment = 'Sample comment for blog demo' WHERE l_orderkey between 3600001 and 3600010;
MERGE INTO customer c USING ( SELECT n_nationkey FROM snowflake_sample_data.tpch_sf1.nation s WHERE n_name = 'UNITED STATES') n ON n.n_nationkey = c.c_nationkey WHEN MATCHED THEN UPDATE SET c.c_comment = 'This is US based customer1';
- Validate that the stream tables have recorded all changes:
SELECT * FROM CUSTOMER_CHECK; SELECT * FROM ORDERS_CHECK; SELECT * FROM LINEITEM_CHECK;
For example, we can query the following customer key value to verify how the events were recorded for the MERGE statement on the customer table:
SELECT * FROM CUSTOMER_CHECK where c_custkey = 60027;
We can see the
METADATA$ISUPDATE
column asTRUE
, and we see DELETE followed by INSERT in theMETADATA$ACTION
column.
- Run the COPY command to offload the CDC from the stream tables to the S3 bucket using the external stage name
unload_to_s3
.In the following code, we’re also copying the data to S3 folders ending with_stg
to ensure that when Redshift Auto Loader automatically creates these tables in Amazon Redshift, they get created and marked as staging tables:COPY INTO @unload_to_s3/customer_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;
COPY INTO @unload_to_s3/customer_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;
COPY INTO @unload_to_s3/lineitem_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;
- Verify the data in the S3 bucket. There will be three sub-folders created in the s3-redshift-loader-source folder of the S3 bucket, and each will have .parquet data files.
You can also automate the preceding COPY commands using tasks, which can be scheduled to run at a set frequency for automatic copy of CDC data from Snowflake to Amazon S3.
- Use the
ACCOUNTADMIN
role to assign theEXECUTE TASK
privilege. In this scenario, we’re assigning the privileges to theSYSADMIN
role:USE ROLE accountadmin; GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE sysadmin;
- Use the
SYSADMIN
role to create three separate tasks to run three COPY commands every 5 minutes:USE ROLE sysadmin;
/* Task to offload Customer CDC table */ CREATE TASK sf_rs_customer_cdc WAREHOUSE = SMALL SCHEDULE = 'USING CRON 5 * * * * UTC' AS COPY INTO @unload_to_s3/customer_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;
/*Task to offload Orders CDC table */ CREATE TASK sf_rs_orders_cdc WAREHOUSE = SMALL SCHEDULE = 'USING CRON 5 * * * * UTC' AS COPY INTO @unload_to_s3/orders_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.orders_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;
/* Task to offload Lineitem CDC table */ CREATE TASK sf_rs_lineitem_cdc WAREHOUSE = SMALL SCHEDULE = 'USING CRON 5 * * * * UTC' AS COPY INTO @unload_to_s3/lineitem_stg/ FROM (select *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check) FILE_FORMAT = (TYPE = PARQUET) OVERWRITE = TRUE HEADER = TRUE;
When the tasks are first created, they’re in a
SUSPENDED
state. - Alter the three tasks and set them to RESUME state:
ALTER TASK sf_rs_customer_cdc RESUME; ALTER TASK sf_rs_orders_cdc RESUME; ALTER TASK sf_rs_lineitem_cdc RESUME;
- Validate that all three tasks have been resumed successfully:
SHOW TASKS;
Now the tasks will run every 5 minutes and look for new data in the stream tables to offload to Amazon S3.As soon as data is migrated from Snowflake to Amazon S3, Redshift Auto Loader automatically infers the schema and instantly creates corresponding tables in Amazon Redshift. Then, by default, it starts loading data from Amazon S3 to Amazon Redshift every 5 minutes. You can also change the default setting of 5 minutes.
- On the Amazon Redshift console, launch the query editor v2 and connect to your Amazon Redshift cluster.
- Browse to the
dev
database,public
schema, and expand Tables.
You can see three staging tables created with the same name as the corresponding folders in Amazon S3. - Validate the data in one of the tables by running the following query:
SELECT * FROM "dev"."public"."customer_stg";
Configure the Redshift Auto Loader utility
The Redshift Auto Loader makes data ingestion to Amazon Redshift significantly easier because it automatically loads data files from Amazon S3 to Amazon Redshift. The files are mapped to the respective tables by simply dropping files into preconfigured locations on Amazon S3. For more details about the architecture and internal workflow, refer to the GitHub repo.
We use an AWS CloudFormation template to set up Redshift Auto Loader. Complete the following steps:
- Launch the CloudFormation template.
- Choose Next.
- For Stack name, enter a name.
- Provide the parameters listed in the following table.
CloudFormation Template Parameter Allowed Values Description RedshiftClusterIdentifier
Amazon Redshift cluster identifier Enter the Amazon Redshift cluster identifier. DatabaseUserName
Database user name in the Amazon Redshift cluster The Amazon Redshift database user name that has access to run the SQL script. DatabaseName
S3 bucket name The name of the Amazon Redshift primary database where the SQL script is run. DatabaseSchemaName
Database name in Amazon Redshift The Amazon Redshift schema name where the tables are created. RedshiftIAMRoleARN
Default or the valid IAM role ARN attached to the Amazon Redshift cluster The IAM role ARN associated with the Amazon Redshift cluster. Your default IAM role is set for the cluster and has access to your S3 bucket, leave it at the default. CopyCommandOptions
Copy option; default is delimiter ‘|’ gzip Provide the additional COPY command data format parameters. If InitiateSchemaDetection = Yes, then the process attempts to detect the schema and automatically set the suitable copy command options.
In the event of failure on schema detection or when InitiateSchemaDetection = No, then this value is used as the default COPY command options to load data.
SourceS3Bucket
S3 bucket name The S3 bucket where the data is stored. Make sure the IAM role that is associated to the Amazon Redshift cluster has access to this bucket. InitiateSchemaDetection
Yes/No Set to Yes to dynamically detect the schema prior to file load and create a table in Amazon Redshift if it doesn’t exist already. If a table already exists, then it won’t drop or recreate the table in Amazon Redshift. If schema detection fails, the process uses the default COPY options as specified in
CopyCommandOptions
.The Redshift Auto Loader uses the COPY command to load data into Amazon Redshift. For this post, set
CopyCommandOptions
as follows, and configure any supported COPY command options:delimiter '|' dateformat 'auto' TIMEFORMAT 'auto'
- Choose Next.
- Accept the default values on the next page and choose Next.
- Select the acknowledgement check box and choose Create stack.
- Monitor the progress of the Stack creation and wait until it is complete.
- To verify the Redshift Auto Loader configuration, sign in to the Amazon S3 console and navigate to the S3 bucket you provided.
You should see a new directorys3-redshift-loader-source
is created.
Copy all the data files exported from Snowflake under s3-redshift-loader-source
.
Merge the data from the CDC S3 staging tables to Amazon Redshift tables
To merge your data from Amazon S3 to Amazon Redshift, complete the following steps:
- Create a temporary staging table
merge_stg
and insert all the rows from the S3 staging table that havemetadata_action
asINSERT
, using the following code. This includes all the new inserts as well as the update.CREATE TEMP TABLE merge_stg AS SELECT * FROM ( SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC ) AS rnk FROM customer_stg WHERE rnk = 1 AND metadata$action = 'INSERT'
The preceding code uses a window function
DENSE_RANK()
to select the latest entries for a givenc_custkey
by assigning a rank to each row for a givenc_custkey
and arrange the data in descending order usinglast_updated_ts
. We then select the rows withrnk=1
andmetadata$action = ‘INSERT’
to capture all the inserts. - Use the S3 staging table
customer_stg
to delete the records from the base tablecustomer
, which are marked as deletes or updates:DELETE FROM customer USING customer_stg WHERE customer.c_custkey = customer_stg.c_custkey;
This deletes all the rows that are present in the CDC S3 staging table, which takes care of rows marked for deletion and updates.
- Use the temporary staging table
merge_stg
to insert the records marked for updates or inserts:INSERT INTO customer SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment FROM merge_stg;
- Truncate the staging table, because we have already updated the target table:
truncate customer_stg;
- You can also run the preceding steps as a stored procedure:
CREATE OR REPLACE PROCEDURE merge_customer() AS $$ BEGIN /*CREATING TEMP TABLE TO GET THE MOST LATEST RECORDS FOR UPDATES/NEW INSERTS*/ CREATE TEMP TABLE merge_stg AS SELECT * FROM ( SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC ) AS rnk FROM customer_stg ) WHERE rnk = 1 AND metadata$action = 'INSERT'; /* DELETING FROM THE BASE TABLE USING THE CDC STAGING TABLE ALL THE RECORDS MARKED AS DELETES OR UPDATES*/ DELETE FROM customer USING customer_stg WHERE customer.c_custkey = customer_stg.c_custkey; /*INSERTING NEW/UPDATED RECORDS IN THE BASE TABLE*/ INSERT INTO customer SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment FROM merge_stg; truncate customer_stg; END; $$ LANGUAGE plpgsql;
For example, let’s look at the before and after states of the customer table when there’s been a change in data for a particular customer.
The following screenshot shows the new changes recorded in the
customer_stg
table forc_custkey = 74360
.
We can see two records for a customer with
c_custkey=74360
one withmetadata$action
asDELETE
and one withmetadata$action
asINSERT
. That means the record withc_custkey
was updated at the source and these changes need to be applied to the targetcustomer
table in Amazon Redshift.The following screenshot shows the current state of the
customer
table before these changes have been merged using the preceding stored procedure:
- Now, to update the target table, we can run the stored procedure as follows:
CALL merge_customer()
The following screenshot shows the final state of the target table after the stored procedure is complete.
Run the stored procedure on a schedule
You can also run the stored procedure on a schedule via Amazon EventBridge. The scheduling steps are as follows:
- On the EventBridge console, choose Create rule.
- For Name, enter a meaningful name, for example,
Trigger-Snowflake-Redshift-CDC-Merge
. - For Event bus, choose default.
- For Rule Type, select Schedule.
- Choose Next.
- For Schedule pattern, select A schedule that runs at a regular rate, such as every 10 minutes.
- For Rate expression, enter Value as 5 and choose Unit as Minutes.
- Choose Next.
- For Target types, choose AWS service.
- For Select a Target, choose Redshift cluster.
- For Cluster, choose the Amazon Redshift cluster identifier.
- For Database name, choose dev.
- For Database user, enter a user name with access to run the stored procedure. It uses temporary credentials to authenticate.
- Optionally, you can also use AWS Secrets Manager for authentication.
- For SQL statement, enter
CALL merge_customer()
. - For Execution role, select Create a new role for this specific resource.
- Choose Next.
- Review the rule parameters and choose Create rule.
After the rule has been created, it automatically triggers the stored procedure in Amazon Redshift every 5 minutes to merge the CDC data into the target table.
Configure Amazon Redshift to share the identified data with AWS Data Exchange
Now that you have the data stored inside Amazon Redshift, you can publish it to customers using AWS Data Exchange.
- In Amazon Redshift, using any query editor, create the data share and add the tables to be shared:
CREATE DATASHARE salesshare MANAGEDBY ADX; ALTER DATASHARE salesshare ADD SCHEMA tpch_sf1; ALTER DATASHARE salesshare ADD TABLE tpch_sf1.customer;
- On the AWS Data Exchange console, create your dataset.
- Select Amazon Redshift datashare.
- Create a revision in the dataset.
- Add assets to the revision (in this case, the Amazon Redshift data share).
- Finalize the revision.
After you create the dataset, you can publish it to the public catalog or directly to customers as a private product. For instructions on how to create and publish products, refer to NEW – AWS Data Exchange for Amazon Redshift
Clean up
To avoid incurring future charges, complete the following steps:
- Delete the CloudFormation stack used to create the Redshift Auto Loader.
- Delete the Amazon Redshift cluster created for this demonstration.
- If you were using an existing cluster, drop the created external table and external schema.
- Delete the S3 bucket you created.
- Delete the Snowflake objects you created.
Conclusion
In this post, we demonstrated how you can set up a fully integrated process that continuously replicates data from Snowflake to Amazon Redshift and then uses Amazon Redshift to offer data to downstream clients over AWS Data Exchange. You can use the same architecture for other purposes, such as sharing data with other Amazon Redshift clusters within the same account, cross-accounts, or even cross-Regions if needed.
About the Authors
Raks Khare is an Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers architect data analytics solutions at scale on the AWS platform.
Ekta Ahuja is a Senior Analytics Specialist Solutions Architect at AWS. She is passionate about helping customers build scalable and robust data and analytics solutions. Before AWS, she worked in several different data engineering and analytics roles. Outside of work, she enjoys baking, traveling, and board games.
Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 13 years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling
and cooking.
Ahmed Shehata is a Senior Analytics Specialist Solutions Architect at AWS based on Toronto. He has more than two decades of experience helping customers modernize their data platforms, Ahmed is passionate about helping customers build efficient, performant and scalable Analytic solutions.