12

Using WebSockets in AWS to Stream DynamoDB Updates

 3 years ago
source link: https://spin.atomicobject.com/2021/01/06/websockets-aws-dynamodb-updates/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

A recent project of mine was potentially in need of a WebSocket API, and I wanted to investigate how to set up a WebSocket API in the AWS ecosystem.

Extending the infrastructure from my previous DynamoDB setup to stream updates to a client via WebSockets was a straightforward problem to solve. I updated the Github project from my last post to connect to an WebSocket API. Here’s what I did to set it up.

WebSocket API

To create a WebSocket API in AWS, I needed to utilize three services:

  • A DynamoDB table to manage connections
  • An API Gateway to create the API
  • Lambda functions to handle connection and disconnection events for the API

First, I created a DynamoDB table to manage the connections for the WebSocket API. This table is separate from the table that I’ll be streaming changes from. It’s a simple table with just a ConnectionId column. Here’s the configuration:

Next, I needed to create the WebSocket API in API gateway. However, the AWS console was going to prompt me for Lambda functions to handle connect and disconnect actions from clients, so I created those first.

Here’s the connect Lambda code:

JavaScript
// connect.js
const DynamoDB = require('aws-sdk/clients/dynamodb');

exports.handler = async function(event, context, callback) {
  const db = new DynamoDB.DocumentClient();
  var putParams = {
    TableName: process.env.TABLE_NAME, // In our case, "WebSocketManager"
    Item: {
      ConnectionId: event.requestContext.connectionId,
    }
  };

  try {
    // Insert incoming connection id in the WebSocket
    await db.put(putParams).promise();

    return {
      statusCode: 200,
      body: "Connected"
    };
  } catch (e) {
    console.error('error!', e);
    return {
      statusCode: 501,
      body: "Failed to connect: " + JSON.stringify(e),
    };
  }
};

And here’s the disconnect Lambda code:

JavaScript
// disconnect.js
const DynamoDB = require('aws-sdk/clients/dynamodb');

exports.handler = async function(event, context, callback) {
  const db = new DynamoDB.DocumentClient();
  var deleteParams = {
    TableName: process.env.TABLE_NAME, // In our case, "WebSocketManager"
    Key: {
      ConnectionId: event.requestContext.connectionId,
    }
  };

  try {
    // If the client dis
    await db.delete(deleteParams).promise();
    return {
      statusCode: 200,
      body: "Disconnected"
    }
  } catch (e) {
    console.error('error!', e);
    return {
      statusCode: 501,
      body: "Failed to disconnect: " + JSON.stringify(e),
    };
  }
};

There’s not much to the connect/disconnect code. I’m just inserting and deleting connection IDs as they come in from events.

Next, I created a WebSocket API in API Gateway. When creating the API, I attached the connect and disconnect Lambda functions to their respective integrations:

attach-integrations-590x346.png

WebSocket Client

Now that I have the WebSocket API built, I connect to it. On my project, I installed wscat to connect to the API. I can run this command to connect to the API:

yarn wscat -c WEBSOCKET_URL_GOES_HERE

The WebSocket URL can be found in the API stage panel in the AWS console:

Screen-Shot-2020-12-12-at-10.53.32-PM-590x62.png

After getting a successful connection to the API, I hooked my DynamoDB stream to a Lambda function that can send events to my computer via WebSockets.

DynamoDB Stream

I needed to create another Lambda function to act as a trigger for DynamoDB stream events. This trigger function needed to look up all connections to my WebSocket API and send a message to each connection via API Gateway. Here’s what the code looks like:

JavaScript
// trigger.js
const ApiGatewayManagementApi = require('aws-sdk/clients/apigatewaymanagementapi');
const DynamoDB = require('aws-sdk/clients/dynamodb');

exports.handler = async function(event, context, callback) {
  const db = new DynamoDB.DocumentClient();
  let connections;

  try {
    connections = await db.scan({ TableName: process.env.TABLE_NAME, ProjectionExpression: 'Id' }).promise();
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }

  const api = new ApiGatewayManagementApi({
    endpoint: process.env.ENDPOINT,
  });

  const postCalls = connections.Items.map(async ({ Id }) => {
    await api.postToConnection({ ConnectionId: Id, Data: JSON.stringify(event) }).promise();
  });

  try {
    await Promise.all(postCalls);
  } catch (e) {
    return { statusCode: 500, body: e.stack };
  }

  return { statusCode: 200, body: 'Event sent.' };
};

The code is pretty straightforward. In the Lambda, I needed to make sure to set up my environment variables properly. TABLE_NAME is the DynamoDB table storing my connections (not the table I’m streaming changes from). ENDPOINT is similar to the URL I used to connect to the WebSocket API, replacing the wss:// with https://.

With the WebSocket API in place, I enabled streaming changes from my DynamoDB. For my project, I wanted to stream changes for the table that contained user profile data. Navigating to the DynamoDB table in the AWS console, I enabled streaming in the “Overview” tab under the section “DynamoDB Stream Details” by clicking “Manage DynamoDB Stream.”

Next, I created a trigger for the same table by navigating to the “Trigger” tab and clicking “Create Trigger.” After that, I could see changes to the records in that table streaming directly to any client connected to my WebSocket.

We’re Done! 🥳

WebSockets are a cool technology with a lot of potential for the projects I work on. While I’m not too interested in streaming DynamoDB changes, this experiment introduced me to WebSockets in AWS. I hope to continue exploring more AWS services in future side projects.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK