Puff the magic Dynamo Dragon

2020-06-02

A year and a bit back we had the inclination to stick it to The Man and hack a system to replace a 10% Filenet rig with S3 and DynamoDB. Obviously, we had to kick its tyres (write test hacks) and chew bubblegummy tobacky (google the crap out of it). This post is about the preparing phase of the POC aka how to load a billion fake records into DynamoDB.

Our goal was to reach the ‘Holy Grail’ of fake inflating a Dynamo table to one bieeeeellion records. The one billion records comprised 100 million file entries with 10 tags each (fields to search on) and we wanted to see what the performance degradation was searching those indexes for the tags, to de-reference S3 keys of the actual files.

Since node is the mostest bestest API ass-kicking tool ever (just ask the folks at artillery), we made it hit Dynamo from multiple sides. Being an event-hipster, our little javascript hack managed to keep a 40 000 Write Capacity Unit table fully occupied on an m5.something-small ec2 instance (node never hit >80MB memory and obviously only hogged 1 CPU).

Once we puffed the table up with fake-o-random, we got to test a rig that had node’s fat ass memory hog distant cousin (Java) query Dynamo. It turns out AWS wasn’t lying and, if you use DynamoDB correctly, it is frigging fast. We pretty much got the same response times with searching on 1 000 records than 1 000 000 000.

Eventually The Man won, and we bought more junk from Hitouchy tin men and soon to be Irrelevant Bleed Machine second-hand software salesmen, but at least I got a back-the-f-off gas-emitting DynamoDB loader out of it.

Here it is in all its hack-and-slash glory…

'use strict';
let AWS = require('aws-sdk');
let uuid = require('uuid');
let events = require('events');

AWS.config.update({region: 'eu-west-1'});
let ddb = new AWS.DynamoDB({apiVersion: '2012-10-08'});

let tags = ["Adipisicing","porro","natus","accusamus","ducimus","aspernatur","Consectetur","vitae","possimus","asperiores","architecto","Maxime","quidem","amet","quisquam","voluptates","Suscipit","velit","odio","Placeat","sunt","aut","repudiandae","illo","pariatur","Sit","lorem","fugit","sapiente","corporis","voluptas","Totam","exercitationem","quam","illo","laboriosam","Distinctio","blanditiis","voluptatum","tenetur","Rerum","pariatur","obcaecati","tempore","reiciendis","Quae","debitis"]

var limit = 20;
var fileCount = process.argv[2] || 0;
var recordCount = 0;
var loadCount = 1000;
var maxFiles = 10000000;
var maxBatchItems = 25;
var backOffTime = 5000
var lastUnprocessedTime = 0
var unProcessedBackOffTime = 200

let emitter = new events.EventEmitter();
var unprocessedItems = []

function fetchNext(index){
  let requestItems = {
    RequestItems: {
      filenet: []
    }
  }
  let itemCount = 0
  while (itemCount<maxBatchItems && unprocessedItems.length>0) {
    requestItems.RequestItems.filenet.push(unprocessedItems.pop())
    itemCount++;
  }
  recordCount+=itemCount
  console.log(index,unprocessedItems.length,itemCount,recordCount,backOffTime)
  if (itemCount>0) {
    ddb.batchWriteItem(requestItems, (e,d) => {
      if (e) {
        console.log(index,"Error",e)
        process.exit(1)
      } else {
        if (d.UnprocessedItems.filenet) {
          backOffTime += unProcessedBackOffTime
          console.log(index,'+backOffTime',backOffTime)
          lastUnprocessedTime = new Date().getTime()
          d.UnprocessedItems.filenet.forEach( x => {
            unprocessedItems.push(x)
          })
        } else {
          if (backOffTime > 0 && new Date().getTime() - lastUnprocessedTime > unProcessedBackOffTime*5) {
            backOffTime -= Math.floor(unProcessedBackOffTime / 2)
            console.log(index,'-backOffTime',backOffTime)
          }
        }
        setTimeout( () => {
          emitter.emit('grabTheNextOne',index);
        },backOffTime)
      }
    })
  } else {
    if (unprocessedItems.length>0) {
      emitter.emit('grabTheNextOne',index);
    }
  }
  emitter.emit('putTheNextOne');
}

emitter.on('grabTheNextOne', fetchNext);

function putNext () {
  const maxTags=10;
  if (fileCount < maxFiles && unprocessedItems.length < loadCount*maxTags) {
    console.log('loading',loadCount,fileCount);
    for (let i=0; i<loadCount; i++) {
      let fileId = uuid()
      let tagBag = {}
      while (Object.keys(tagBag).length<maxTags) {
        let tagIndex = Math.floor(Math.random()*38)
        tagBag[tags[tagIndex]]=''
      }
      Object.keys(tagBag).forEach( k => {
        let tagIndex = Math.floor(Math.random()*38)
        unprocessedItems.push({
          PutRequest: {
            Item: {
              FileId: { S: 'fileid-'+fileId },
              Tag: { S: k+'|'+fileCount }
            }
          }
        })
      })
      let doubleRef = fileCount+Math.floor(Math.random()*100+fileCount)
      unprocessedItems.push({
        PutRequest: {
          Item: {
            FileId: { S: 'fileid-'+fileId },
            Tag: { S: 'LoremIpsum|'+(fileCount+1)}
          }
        }
      })
      unprocessedItems.push({
        PutRequest: {
          Item: {
            FileId: { S: 'fileid-'+fileId },
            Tag: { S: 'LoremIpsum|'+doubleRef }
          }
        }
      })
      fileCount++
    }
  }
}

emitter.on('putTheNextOne', putNext);

putNext();

for (let i=0; i<limit; i++) {
  setTimeout( () => {
    emitter.emit('grabTheNextOne',i);
  }, Math.floor(Math.random()*unProcessedBackOffTime*i))
}

The code uses two ping pong events:

  • putTheNextOne - indicates the queue filling fun pushing function (qffpf) should check if it needs to dump loadCount items into the queue

  • grabTheNextOne - indicates that the queue bubble wrap popping function (qbwpf) should take 25 of those unprocessedItems and call-a-roony old batchWriteItem and deal with the fallout of hitting the WCU limit (basically add backOffTime and push the leftovers back onto the queue). If there were no throwbacks from the write and there is still some backOffTime it reduces it.

ps - if you think qffpf and qbwpf have some computer sciency significance, besides sounding cool, ‘queue-fffip-pifff’ and ‘queue-buh-whip-piff’ don’t.

At the end of the script, after putNext forces some dope into the unprocessedItems to get the party started, it pretty much schedules 20 (limit) fetchNext event listeners (that each does a 25 batchWriteItem ) that get eased into processing at random times. After they suck some out of unprocessedItems they ping putNext to let it see if it needs to replenish the queue.

If you run this, you should see Cloudwatch telling you your WCU’s are getting knocked around and should jigsaw above and below the WCU limit. It is kind of like a self-regulating event system - SRES (there, I coined it, and no, it also isn’t a thing).

It ain’t quite what the peeps meant around Exponential backoff and also not quite what these dudes mean with Jitter’ed backoff, but it is cute to at least have something that yoyos around around a target rate and can bulk whack and keep pressure on an API.

Anyhoos

  • If you want to really amp it up, use cluster and let the workers hit more CPU’s, but for our test we easily kept hitting the WCU limits.
  • You can easily make use of streaming like I did in this flowlog-regex-like-it’s-1985 post. This would feed the queue from some source, like S3 for example.
  • If you want a more straightforward example of an S3 sucker upper tool checkout cousin R’s blog post