Stateful AWS lambdas part1: polling SQS

Lambdas are presented as a stateless product, but there are a few good reasons (and many more bad ones) to get around this and do more than a single operation on each invocation.

Suppose you want to poll an SQS queue, but only for 5 minutes during a day? You could launch an EC2 on a schedule, but bootstrapping will be slow and you’ll still pay for a full hour, which is not great. Lambda is meant to be event-based (i.e. something triggers your lambda), so polling can be a bit tricky.

Here’s a pretty easy way to get around this limitation, the gist of it is:

On the AWS side

  • set your lambda timeout to the maximum time you’ll need to run your code
  • create a cron-like expression to run your job as (not) ofter as you like
  • make sure your lambda role has permission to poll from SQS

In your code

  • keep a timer running and fire off your callback before your lambda timeout
  • during that time, run your code as usual
var exitTimeout = false;
var lambdaTimeout = 300000;
var AWS = require('aws-sdk');
var sqs = new AWS.SQS({'region': 'us-east-1'});
var queueUrl = 'https://sqs.us-east-1.amazonaws.com/account-id-probably/your-queue-name';

function main(callback) {
    exitTimeout = false;

    setTimeout(function () {
        exitTimeout = true;
    }, lambdaTimeout - (1000 * 25)); // set our timeout to 2 minutes, minus 25 seconds (to be sure, and leave time to finish whatever work the loop was doing)

    function poll() {
        if (exitTimeout === true) {
            console.log('Out of time, exiting');
            return callback(null);
        } else {
            console.log('Still got time...polling the queue');
        }

        return sqs.receiveMessage({'QueueUrl': queueUrl}).promise()
            .then(result => {
                if (!result.Messages) return []; // no messages were found in the queue, return an empty array of messages to delete
                var deleteOperations = []; // this is where we store the messages to delete
                result.Messages.forEach(message => {
                    // do some interesting processing here!
                    console.log(`MessageId: ${message.MessageId}`);

                    // line up all messages to be deleted
                    deleteOperations.push(sqs.deleteMessage({
                        QueueUrl: queueUrl,
                        ReceiptHandle: message.ReceiptHandle
                    }).promise())
                });
                return deleteOperations;

            })
            .then((data) => Promise.all(data)) // delete all messages at once
            .then(() => {
                console.log('Waiting a bit before polling again');
                setTimeout(function () {
                    return poll(); // call ourselves again to keep polling the queue
                }, 500);
            })
            .catch(console.log);
    }

    poll();
}

exports.handler = function (event, context, callback) {
    return main(callback);
};

A few notes on the code above:

  • lambdaTimeout should match (in milliseconds) the timeout value on your lambda
  • I’ve decided in this example to exit at lambdaTimeout – 25 seconds, since long polling on SQS was set to 20 seconds on my queue (giving my lambda 5 seconds to process the data and delete the message). This is completely arbitrary though, and depends on the “Receive Message Wait Time” you have set on your queue, and the work you intend to do on each message.
  • Notice how we recall poll() at the end of the promise chain – this function is recursive
  • There is a timeout of 500ms before calling poll(). It is not strictly necessary, but for some types of work is a good idea to avoid throttling.
  • Don’t forget that if your lambda invocation fails, it may retry. Depending on what you’re doing in the poll() loop, you may want to ignore/silence those and keep going (notice how nothing rejects() in this code, but only catches and logs the error).