Using DynamoDB Streams to sync back to your old database

In my continuing saga of database and api modernization, I'm going to explore using DynamoDB streams to back-propagate data to the old MySQL database. I'm hoping it won't come to this in practice...

For our "legacy" system, we're going to assume we have an API Gateway that invokes lambda functions that talk to an Aurora Serverless (MySQL) database.

We're then going to update the API Gateway to instead read/write to DynamoDB... and use DynamoDB streams to re-use the old legacy lambdas to write BACK to Aurora.

For the demo I'm going to create separate endpoints so I can continue using both, but in practice you'd do the cutover.

Alt Text

Table of Contents

Pre-reqs

On my local comp I have the following installed:

$ node --version
v12.13.1
$ npm --version
6.12.1
$ aws --version
aws-cli/2.0.23 Python/3.7.4 Darwin/19.5.0 botocore/2.0.0dev27
$ cdk --version
1.45.0 (build 0cfab15)

Once you have those things... let's bootstrap a project using Amazon's CDK

mkdir blog-cdk-streams && cd blog-cdk-streams
cdk init --language typescript
npm i @aws-cdk/aws-apigateway @aws-cdk/aws-lambda @aws-cdk/aws-lambda-nodejs @aws-cdk/aws-dynamodb @aws-cdk/aws-rds @aws-cdk/aws-secretsmanager @aws-cdk/aws-ssm @aws-cdk/aws-ec2 @aws-cdk/aws-lambda-event-sources aws-sdk --save

If, instead, you want to clone the project... have at it:

{% github martzcodes/blog-cdk-streams no-readme %}

Mocking the "Legacy"

I'm going to provide links to the code for setting up the "legacy" configuration, but I'm not going to go through it in detail since it's a less important aspect of this post.

Create an Aurora Serverless MySQL table

AWS and the CDK make it surprisingly difficult to spin up a simple RDS implementation... you have to pre-configure a lot of things. I'm hoping one day they'll bake that into the CDK... Until then, I found this post useful as inspiration. I'm going to start with their Aurora Serverless class to get started a bit faster.

Alt Text

CDK not making Aurora very easy...

My version of this code is here: github.com/martzcodes/blog-cdk-streams/blob..

At the time of writing that blog post probably worked, but AWS seems to have made some changes since. I had to add one VERY IMPORTANT line to the cluster creation method:

const dbcluster = new CfnDBCluster(this, "apidbcluster", {
      engine: "aurora",
      engineMode: "serverless",
      masterUsername: secret.secretValueFromJson("username").toString(),
      masterUserPassword: secret.secretValueFromJson("password").toString(),
      deletionProtection: false,
      enableHttpEndpoint: true, // this is important!
      scalingConfiguration: {
        autoPause: true,
        minCapacity: 1,
        maxCapacity: 16,
        secondsUntilAutoPause: 300,
      },
      dbSubnetGroupName: new CfnDBSubnetGroup(this, "db-subnet-group", {
        dbSubnetGroupDescription: `${props.clusterName} database cluster subnet group`,
        subnetIds: props.vpc.selectSubnets({
          subnetType: ec2.SubnetType.PRIVATE,
        }).subnetIds,
      }).ref,
    });

The enableHttpEndpoint is half of what allows the lambdas to talk to the Serverless project. Without it, you'd end up getting this error:

Exception BadRequestException: HttpEndpoint is not enabled for cluster blogcdkstreamsstack-auroraserverlesschefapidbclus-1j3rs66zfzfkc. Please refer to https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/data-api.html#data-api.troubleshooting

Aurora Lambdas

On the Aurora side of things for this demo to work I need 3 lambdas with permissions to read/write to Aurora.

All the files end up following the same basic pattern:

import * as AWS from "aws-sdk";
import { RDSDataService } from "aws-sdk";

const RDSDATA = new AWS.RDSDataService();

AWS.config.update({
  maxRetries: 10,
  httpOptions: {
    timeout: 60000,
    connectTimeout: 60000,
  },
});

export const handler = async (event: any): Promise<any> => {
  try {
    console.log("START");
    console.log(event);
    console.log("ENV SECRETARN: " + process.env.SECRETARN);
    console.log("ENV DBCLUSTERARN: " + process.env.DBCLUSTERARN);
    console.log("ENV DBCLUSTERID: " + process.env.DBCLUSTERID);

    // TRY AND DO STUFF HERE

  } catch (e) {
    console.log(e);
    return {
      statusCode: 200,
      headers: { "Content-Type": "text/html; charset=utf-8" },
      body: `<html><body>Exception ${e} You've hit ${event.path} </body></html>\n`,
    };
  }
};

For the INSERT lambda I have some actual interesting code:

interface Chef {
  firstName: string;
  lastName: string;
}

const processEvent = (event: any): Chef[] => {
  if (event.body) {
    const body = JSON.parse(event.body);
    return [
      {
        firstName: body.firstName || "Gordon",
        lastName: body.lastName || "Ramsay",
      },
    ];
  }
  if (event.Records) {
    const inserts = event.Records.filter(
      (evt: any) =>  evt.eventName === "INSERT"
    );
    const items = inserts.map((ins: any) => {
      return {
        firstName: ins.dynamodb.NewImage.firstName.S || "Gordon",
        lastName: ins.dynamodb.NewImage.lastName.S || "Ramsay",
      };
    });
    return items;
  }
  return [];
};

That code checks the lambda event to see if it's coming from the API Gateway or from the DynamoDB Stream trigger (spoilers). If it has a body it came from the API Gateway, if it has Records it came from the stream. I then process it down into a common interface so the same lambda can be used in BOTH cases.

Alt Text

It works

Alternately, you could turn the original lambda into a step-function with the DynamoDB stream trigger and pre-process the data before sending it to the "original" / "legacy" lambda.

The three lambdas get created in the main blog-cdk-streams-stack.ts file using the experimental aws-lambda-nodejs module for CDK.

They then get assigned the policy statements needed to talk to Aurora.

Legacy APIGateway

Finally to tie the "legacy" side together we create the resources and endpoints to use the lambdas...

GET <apigatewayurl>/aurora/ # init endpoint
GET <apigatewayurl>/aurora/chefs # get all chefs
POST <apigatewayurl>/aurora/chefs # insert a chef

Deploy "Legacy"

At this point we'd have everything up to Line 111... if you use my code you can comment out the stuff below and deploy and you'd have a working version of the "Legacy System".

To deploy the code to AWS it's as easy as running:

cdk bootstrap --profile
cdk deploy --profile personal

Let it bake...

Run those and then let it bake.

The size of the changes are pretty impressive, but that's because it has to set up so much infrastructure to get Aurora up and running.

Create the DynamoDB Table

Now that we've established our Mock "legacy" system... we can get to the fun stuff.

Create the DynamoDB table with streams enabled:

    const dynamoTable = new Table(this, serviceName, {
      billingMode: BillingMode.PAY_PER_REQUEST,
      partitionKey: {
        name: `ChefId`,
        type: AttributeType.STRING,
      },
      removalPolicy: RemovalPolicy.DESTROY,
      tableName: serviceName,
      stream: StreamViewType.NEW_IMAGE, // this enables the streams
    });

Alt Text

That's it!

DynamoDB Lambdas

There are only two "modernized" lambdas:

And connecting them to DynamoDB is much simpler using grantRead and grantReadWrite...

    dynamoTable.grantReadWriteData(updateDynamoChef);
    dynamoTable.grantReadData(getDynamoChefs);

Update the API Gateway

For the demo's sake I'm going to keep the old endpoints. In practice I would update which functions the original endpoints were calling.

GET <apigatewayurl>/dynamo/chefs # get all chefs
POST <apigatewayurl>/dynamo/chefs # insert a chef

DynamoDB Stream to trigger the legacy lambda

Finally... add an event source to the legacy lambda to trigger on the DynamoDB stream event...

    const eventSourceProps: DynamoEventSourceProps = {
      startingPosition: StartingPosition.LATEST,
      batchSize: 1,
    };
    writeAuroraChefs.addEventSource(new DynamoEventSource(dynamoTable as any, eventSourceProps) as any);

Alt Text

Yeah... it's that simple...

Pièce de résistance

After re-running cdk deploy --profile personal and waiting for the updates to happen... let's walk through our endpoints.

I stored all of the requests here, so I'm just going to highlight the interesting bits.

1. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/ initializes the DB
2. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs returns an empty list of chefs
3. POST https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs {"firstName":"Alton","lastName":"Brown"} inserts `Alton Brown` in the table
4. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs (same endpoint as 2) returns "Alton Brown" row in the list
5. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/dynamo/chefs returns an empty table from dynamo
6. POST https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/dynamo/chefs {"firstName":"Bobby","lastName":"Flay"} adds Bobby Flay to dynamo
7. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/dynamo/chefs confirms Bobby Flay is in dynamo
8. GET https://nlqemzi885.execute-api.us-east-1.amazonaws.com/prod/aurora/chefs (same endpoint as 2 and 4) returns Alton Brown and Bobby Flay in Aurora!

Alt Text

Well said Chef Ramsay...