visit
With the Lambda trigger defined as above, we rely on the Serverless Framework to set its parameters. Some default values generated by the SF are different from CloudFormation defaults for the Lambda trigger (
AWS::Lambda::EventSourceMapping resource
). That could be a sufficient argument to define them explicitly – there is a smaller chance that someone will assume their values incorrectly by looking at the wrong documentation.However, what’s even more important is the fact that the default values will rarely suit us in practice.Since this is A General Pitfall, let’s break it into smaller ones by looking at what we can do wrong (and how to fix it!) with different parameters.Why is it bad?
With only one record processed each time, we have more Lambda invocations to pay for. But that’s not all. We probably would like to do something with that record. Analyze, filter, and send to some API endpoint, database, etc. Those operations can usually take a batch of items – for example, we can add multiple items to the database with a single query. Processing and sending 10 or 100 records in a batch is usually much faster than doing it 10 or 100 times separately.
On top of that, we generate a load on the target service with each call. That increases its resource usage and/or costs. Business requirements often say “real-time processing”, but in reality, often a delay of 3, 5, or 15 seconds does not make any difference.
On the other hand, if we have a backlog of items to process, we take only 10. Is it optimal? Wouldn’t it better (faster and cheaper) to process 100 at once, or, if we speak about it, 1000? Of course, the answer to this question will depend on what we actually do, our function logic, and the external calls we make. But we need to think about it and choose some value consciously.
With two parameters:
batchSize
and batchWindow
.batchSize
sets the maximum number of records processed by the Lambda function at once. batchWindow
sets the maximum number of seconds to wait and accumulate records before triggering the Lambda.Here we will set those parameters as follows:What does it mean?
Lambda will trigger when there are 100 records in a stream shard, after 5 seconds, or after accumulating 6 MB of payload size (built-in limit), whatever will happen first.Again, choosing those values depends on the business requirements. Not choosing them is rarely a valid option.If we worked with SQS before and expect the processing to be retried three times until the record is rejected, we will have a very unpleasant surprise. By default, Lambda will try to process the batch 10,000 times, the maximum possible number of retries. Moreover, it will not process further records until the problem is resolved. This is due to the Kinesis guarantee of in-order processing.
Of course, we may want to repeat the execution 10,000 times. Maybe the problem lies not in the data but in an external system we call. But more often, we want to skip the problematic message and continue with the next ones, preventing our systems from hanging with old data.We will start with reducing the number of retries. To do so, we will set the
maximumRetryAttempts
.While we can get notified about failures from the Lambda metrics, it’s generally a good idea to send info about unprocessed records to a Dead Letter Queue. This way, when we find and fix the faulty Lambda logic causing the problem, we can re-ingest dropped records. Our DLQ can be an SQS or SNS that we reference under
destinations.onFailure
.Here a clarification is needed: records themself are not sent to DLQ, only information about them. Each message will contain details about the failed batch position in the stream. So having it, we can later get records directly from the Kinesis as long as they don’t reach the retention period. Here is an example of such a message:For that, the solution is a
bisectBatchOnFunctionError
option. When set to true, each time the Lambda execution will fail, the batch will be split in two and retried separately. Depending on the batch size and number of retries, we may eventually isolate the single malformed record. After discarding it, we can successfully process all others.Contrary to SQS, messages in Kinesis are not removed after being read by the client. Instead, each client keeps track of the last record it read. Lambda, of course, does it for us.
When we deploy a new Lambda function with a Kinesis as a trigger, it will start by reading all the existing records from the stream by default. Depending on the configured stream retention period, that can mean all messages from even the last 365 days.No need to say it may not be what we aim to do. It can take long hours before we process all the old data and catch up with the current records. Not to mention a bill that it can cause.For those reasons, if we want to process only the new messages incoming from the moment we deploy our function, we need to set the startingPosition explicitly. The default value is TRIM_HORIZON – to start from the oldest record available. To start from the latest record at the moment of function deployment, we change it to LATEST.Kinesis Data Stream consists of shards, and we pay for the number of them. Each shard can receive up to 1 MB or 1,000 records per second. However, the fact we have enough throughput to ingest messages into the shard doesn’t mean we can read and process them at the same rate.
The other solution is to use the
parallelizationFactor
property. It enables processing messages from a single shard in up to 10 concurrent executions. Despite reading from shard in parallel, the order of records with the same partition key is still maintained. Thus, increasing parallelization allows safely processing a higher data volume without increasing the costs of Kinesis. However, the max total read throughput per shard still applies.As always, we should monitor our Lambda function for failures, timeouts, and throttles. We can do it directly in AWS by creating individual alarms for each of these metrics. Another option is to use Dashbird, which will monitor our function’s health out of the box without any additional configuration.
But when dealing with Kinesis, it’s not enough. Even if Lambda works correctly, that doesn’t mean the whole system runs smoothly. We may have more records incoming to the stream than we can process. There may be multiple reasons, like traffic spikes or increased latency of external service used by the Lambda.That’s why one of the most crucial Kinesis metrics to keep an eye on is the Iterator Age. It tells us how long the message was in the stream before the Lambda read it. The growing age of records is automatically detected and reported by Dashbird for all streams with no additional setup.
Fortunately, Dashbird provides insights and auto-detection of such scenarios as well. In addition to reporting read and write throttles or failing processing, it also alerts about abandoned streams with no new incoming data.
There is also a bright side – a small bonus. When we learn how to work with Kinesis, most of our knowledge will also apply to handling DynamoDB Streams. We can utilize most of the configuration options and solutions mentioned above when the Lambda function processes change from DynamoDB.
Full code for the Serverless Framework configuration, Lambda function handler, and dummy records producer is available on my GitHub:Also published on: