Build a serverless streaming pipeline with Amazon MSK Serverless, Amazon MSK Connect, and MongoDB Atlas

  • This post was cowritten with Babu Srinivasan and Robert Walters from MongoDB.
    Amazon  Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available Apache Kafka service. Amazon MSK makes it easy to ingest and process streaming data in real time and use that data easily within the AWS ecosystem. With Amazon MSK Serverless, you can automatically provision and manage required resources to provide on-demand streaming capacity and storage for your applications.

    Amazon MSK also supports integration of data sources such as MongoDB Atlas via Amazon MSK Connect. MSK Connect allows serverless integration of MongoDB data with Amazon MSK using the MongoDB Connector for Apache Kafka.

    MongoDB Atlas Serverless provides database services that dynamically scale up and down with data size and throughput—and the cost scales accordingly. It’s best suited for applications with variable demands to be managed with minimal configuration. It provides high performance and reliability with automated upgrade, encryption, security, metrics, and backup features built in with the MongoDB Atlas infrastructure.

    MSK Serverless is a type of cluster for Amazon MSK. Just like MongoDB Atlas Serverless, MSK Serverless automatically provisions and scales compute and storage resources. You can now create end-to-end serverless workflows. You can build a serverless streaming pipeline with serverless ingestion using MSK Serverless and serverless storage using MongoDB Atlas. In addition, MSK Connect now supports private DNS hostnames. This allows Serverless MSK instances to connect to Serverless MongoDB clusters via AWS PrivateLink, providing you with secure connectivity between platforms.

    If you’re interested in using a non-serverless cluster, refer to Integrating MongoDB with Amazon Managed Streaming for Apache Kafka (MSK).

    This post demonstrates how to implement a serverless streaming pipeline with MSK Serverless, MSK Connect, and MongoDB Atlas.

    Solution overview

    The following diagram illustrates our solution architecture.

    Data flow between AWS MSK and MongoDB Atlas

    The data flow starts with an Amazon Elastic Compute Cloud (Amazon EC2) client instance that writes records to an MSK topic. As data arrives, an instance of the MongoDB Connector for Apache Kafka writes the data to a collection in the MongoDB Atlas Serverless cluster. For secure connectivity between the two platforms, an AWS PrivateLink connection is created between the MongoDB Atlas cluster and the VPC containing the MSK instance.

    This post walks you through the following steps:

    1. Create the serverless MSK cluster.
    2. Create the MongoDB Atlas Serverless cluster.
    3. Configure the MSK plugin.
    4. Create the EC2 client.
    5. Configure an MSK topic.
    6. Configure the MongoDB Connector for Apache Kafka as a sink.

    Configure the serverless MSK cluster

    To create a serverless MSK cluster, complete the following steps:

    1. On the Amazon MSK console, choose Clusters in the navigation pane.
    2. Choose Create cluster.
    3. For Creation method, select Custom create.
    4. For Cluster name, enter MongoDBMSKCluster.
    5. For Cluster type¸ select Serverless.
    6. Choose Next.Serverless MSK Cluster creation UI

    7. On the Networking page, specify your VPC, Availability Zones, and corresponding subnets.
    8. Note the Availability Zones and subnets to use later.Cluster settings showing VPC and Subnets

    9. Choose Next.
    10. Choose Create cluster.

    When the cluster is available, its status becomes Active.

    Cluster Available for Use

    Create the MongoDB Atlas Serverless cluster

    To create a MongoDB Atlas cluster, follow the Getting Started with Atlas tutorial. Note that for the purposes of this post, you need to create a serverless instance.

    Create new cluster dialog

    After the cluster is created, configure an AWS private endpoint with the following steps:

    1. On the Security menu, choose Network Access.Network Access location in the Security menu

    2. On the Private Endpoint tab, choose Serverless Instance.
      Serverless Instance network access

    3. Choose Create new endpoint.
    4. For Serverless Instance, choose the instance you just created.
    5. Choose Confirm.Create Private Endpoint UI

    6. Provide your VPC endpoint configuration and choose Next.VPC Endpoint Configuration UI

    7. When creating the AWS PrivateLink resource, make sure you specify the exact same VPC and subnets that you used earlier when creating the networking configuration for the serverless MSK instance.
    8. Choose Next.VPC Endpoint Subnet Configuration UI

    9. Follow the instructions on the Finalize page, then choose Confirm after your VPC endpoint is created.

    Upon success, the new private endpoint will show up in the list, as shown in the following screenshot.

    Network Access Confirmation Page

    Configure the MSK Plugin

    Next, we create a custom plugin in Amazon MSK using the MongoDB Connector for Apache Kafka. The connector needs to be uploaded to an Amazon Simple Storage Service (Amazon S3) bucket before you can create the plugin. To download the MongoDB Connector for Apache Kafka, refer to Download a Connector JAR File.

    1. On the Amazon MSK console, choose Customized plugins in the navigation pane.
    2. Choose Create custom plugin.
    3. For S3 URI, enter the S3 location of the downloaded connector.
    4. Choose Create custom plugin.

    MSK plugin details

    Configure an EC2 client

    Next, let’s configure an EC2 instance. We use this instance to create the topic and insert data into the topic. For instructions, refer to the section Configure an EC2 client in the post Integrating MongoDB with Amazon Managed Streaming for Apache Kafka (MSK).

    Create a topic on the MSK cluster

    To create a Kafka topic, we need to install the Kafka CLI first.

    1. On the client EC2 instance, first install Java:

    sudo yum install java-1.8.0

    1. Next, run the following command to download Apache Kafka:

    wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz

    1. Unpack the tar file using the following command:

    tar -xzf kafka_2.12-2.6.2.tgz

    The distribution of Kafka includes a bin folder with tools that can be used to manage topics.

    1. Go to the kafka_2.12-2.6.2 directory and issue the following command to create a Kafka topic on the serverless MSK cluster:

    bin/kafka-topics.sh --create --topic sandbox_sync2 --bootstrap-server <BOOTSTRAP SERVER> --command-config=bin/client.properties --partitions 2

    You can copy the bootstrap server endpoint on the View Client Information page for your serverless MSK cluster.

    Bootstrap Server Connection Page

    You can configure IAM authentication by following these instructions.

    Configure the sink connector

    Now, let’s configure a sink connector to send the data to the MongoDB Atlas Serverless instance.

    1. On the Amazon MSK console, choose Connectors in the navigation pane.
    2. Choose Create connector.
    3. Select the plugin you created earlier.
    4. Choose Next.Sink Connector UI

    5. Select the serverless MSK instance that you created earlier.
    6. Enter your connection configuration as the following code:
    connector.class=com.mongodb.kafka.connect.MongoSinkConnector
    key.converter.schema.enable=false
    value.converter.schema.enable=false
    database=MongoDBMSKDemo
    collection=Sink
    tasks.max=1
    topics=MongoDBMSKDemo.Source
    connection.uri=(MongoDB Atlas Connection String Gos Here) 
    value.converter=org.apache.kafka.connect.storage.StringConverter 
    key.converter=org.apache.kafka.connect.storage.StringConverter
    

    Make sure that the connection to the MongoDB Atlas Serverless instance is through AWS PrivateLink. For more information, refer to Connecting Applications Securely to a MongoDB Atlas Data Plane with AWS PrivateLink.

    1. In the Access Permissions section, create an AWS Identity and Access Management (IAM) role with the required trust policy.
    2. Choose Next.IAM Role configuration

    3. Specify Amazon CloudWatch Logs as your log delivery option.
    4. Complete your connector.

    When the connector status changes to Active, the pipeline is ready.

    Connector Confirmation Page

    Insert data into the MSK topic

    On your EC2 client, insert data into the MSK topic using the kafka-console-producer as follows:

    bin/kafka-console-producer.sh --topic sandbox_sync2 --bootstrap-server <BOOTSTRAP SERVER> --producer.config=bin/client.properties

    To verify that data successfully flows from the Kafka topic to the serverless MongoDB cluster, we use the MongoDB Atlas UI.

    MongoDB Atlas Browse Collections UI

    If you run into any issues, be sure to check the log files. In this example, we used CloudWatch to read the events that were generated from Amazon MSK and the MongoDB Connector for Apache Kafka.

    CloudWatch Logs UI

    Clean up

    To avoid incurring future charges, clean up the resources you created. First, delete the MSK cluster, connector, and EC2 instance:

    1. On the Amazon MSK console, choose Clusters in the navigation pane.
    2. Select your cluster and on the Actions menu, choose Delete.
    3. Choose Connectors in the navigation pane.
    4. Select your connector and choose Delete.
    5. Choose Customized plugins in the navigation pane.
    6. Select your plugin and choose Delete.
    7. On the Amazon EC2 console, choose Instances in the navigation pane.
    8. Choose the instance you created.
    9. Choose Instance state, then choose Terminate instance.
    10. On the Amazon VPC console, choose Endpoints in the navigation pane.
    11. Select the endpoint you created and on the Actions menu, choose Delete VPC endpoints.

    Now you can delete the Atlas cluster and AWS PrivateLink:

    1. Log in to the Atlas cluster console.
    2. Navigate to the serverless cluster to be deleted.
    3. On the options drop-down menu, choose Terminate.

    4. Navigate to the Network Access section.
    5. Choose the private endpoint.
    6. Select the serverless instance.
    7. On the options drop-down menu, choose Terminate.Endpoint Termination UI

    Summary

    In this post, we showed you how to build a serverless streaming ingestion pipeline using MSK Serverless and MongoDB Atlas Serverless. With MSK Serverless, you can automatically provision and manage required resources on an as-needed basis. We used a MongoDB connector deployed on MSK Connect to seamlessly integrate the two services, and used an EC2 client to send sample data to the MSK topic. MSK Connect now supports Private DNS hostnames, enabling you to use private domain names between the services. In this post, the connector used the default DNS servers of the VPC to resolve the Availability Zone-specific private DNS name. This AWS PrivateLink configuration allowed secure and private connectivity between the MSK Serverless instance and the MongoDB Atlas Serverless instance.

    To continue your learning, check out the following resources:


About the Authors

Igor Alekseev is a Senior Partner Solution Architect at AWS in Data and Analytics domain. In his role Igor is working with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect he implemented many projects in Big Data domain, including several data lakes in Hadoop ecosystem. As a Data Engineer he was involved in applying AI/ML to fraud detection and office automation.

Kiran Matty is a Principal Product Manager with Amazon Web Services (AWS) and works with the Amazon Managed Streaming for Apache Kafka (Amazon MSK) team based out of Palo Alto, California. He is passionate about building performant streaming and analytical services that help enterprises realize their critical use cases.

SaleBestseller No. 1
SAMSUNG Galaxy A54 5G A Series Cell Phone, Unlocked Android Smartphone, 128GB, 6.4” Fluid Display Screen, Pro Grade Camera, Long Battery Life, Refined Design, US Version, 2023, Awesome Black
  • CRISP DETAIL, CLEAR DISPLAY: Enjoy binge-watching...
  • PRO SHOTS WITH EASE: Brilliant sunrises, awesome...
  • CHARGE UP AND CHARGE ON: Always be ready for an...
  • POWERFUL 5G PERFORMANCE: Do what you love most —...
  • NEW LOOK, ADDED DURABILITY: Galaxy A54 5G is...
Bestseller No. 2
OnePlus 12,16GB RAM+512GB,Dual-SIM,Unlocked Android Smartphone,Supports 50W Wireless Charging,Latest Mobile Processor,Advanced Hasselblad Camera,5400 mAh Battery,2024,Flowy Emerald
  • Free 6 months of Google One and 3 months of...
  • Pure Performance: The OnePlus 12 is powered by the...
  • Brilliant Display: The OnePlus 12 has a stunning...
  • Powered by Trinity Engine: The OnePlus 12's...
  • Powerful, Versatile Camera: Explore the new 4th...

 Babu Srinivasan is a Senior Partner Solutions Architect at MongoDB. In his current role, he is working with AWS to build the technical integrations and reference architectures for the AWS and MongoDB solutions. He has more than two decades of experience in Database and Cloud technologies . He is passionate about providing technical solutions to customers working with multiple Global System Integrators(GSIs) across multiple geographies.

Robert Walters is currently a Senior Product Manager at MongoDB. Previous to MongoDB, Rob spent 17 years at Microsoft working in various roles, including program management on the SQL Server team, consulting, and technical pre-sales. Rob has co-authored three patents for technologies used within SQL Server and was the lead author of several technical books on SQL Server. Rob is currently an active blogger on MongoDB Blogs.

New
Fadnou I23 Ultra Unlocked Cell Phone,Built in Pen,Smartphone Battery 6800mAh 6.8" HD Screen Unlocked Phones,6+256GB Android13 with 128G Memory Card,Face ID/Fingerprint Lock/GPS (Purple)
  • 【Octa-Core CPU + 128GB Expandable TF Card】...
  • 【6.8 HD+ Android 13.0】 This is an Android Cell...
  • 【Dual SIM and Global Band 5G Phone】The machine...
  • 【6800mAh Long lasting battery】With the 6800mAh...
  • 【Business Services】The main additional...
New
Huness I15 Pro MAX Smartphone Unlocked Cell Phone,Battery 6800mAh 6.8 HD Screen Unlocked Phone,6+256GB Android 13 with 128GB Memory Card,Dual SIM/5G/Fingerprint Lock/Face ID (Black, 6+256)
  • 【Dimensity 9000 CPU + 128GB Expandable TF...
  • 【6.8 HD+ Android 13.0】 This is an Android Cell...
  • 【Dual SIM and Global Band 5G Phone】Dual SIM &...
  • 【6800mAh Long lasting battery】The I15 Pro MAX...
  • 【Business Services】The main additional...
New
Jopuzia U24 Ultra Unlocked Cell Phone, 5G Smartphone with S Pen, 8GB+256GB Full Netcom Unlocked Phone, 6800mAh Battery 6.8" FHD+ Display 120Hz 80MP Camera, GPS/Face ID/Dual SIM Phone (Rose Gold)
  • 🥇【6.8" HD Unlocked Android Phones】Please...
  • 💗【Octa-Core CPU+ 256GB Storage】U24 Ultra...
  • 💗【Support Global Band 5G Dual SIM】U24 Ultra...
  • 💗【80MP Professional Photography】The U24...
  • 💗【6800mAh Long Lasting Battery】With the...

Original Post>