visit
This post originally appeared on the and explores how Segment reliably sends billions of messages per day to hundreds of public APIs, as well as the data model used to run it in production. , Hacker Noon’s weekly sponsor, is currently offering a 90 day free trial — email friends@segment and mention Hacker Noon to redeem.Today, we’re excited to share the architecture for Centrifuge–Segment’s system for reliably sending billions of messages per day to hundreds of public APIs. This post explores the problems Centrifuge solves, as well as the data model we use to run it in production.
At , our core product collects, processes, and delivers hundreds of thousands of analytics events per second. These events consist of user actions like viewing a page, buying an item from Amazon, or liking a friend’s playlist. No matter what the event is, it’s almost always the result of some person on the internet doing something.
We take these incoming events and forward them to hundreds of downstream endpoints like , , and per-customer . At any point in time, dozens of these endpoints will be in a state of failure. We’ll see 10x increases in response latency, spikes in 5xx status codes, and aggressive rate limiting for single large customers. To give you a flavor, here are the sorts of latencies and uptimes I pulled from our internal monitoring earlier today.In the best case, these API failures cause delays. In the worst case, data loss. As it turns out, ‘’ that many requests in a faulty environment is a complex problem. You have to think hard about fairness (what data should you prioritize?), buffering semantics (how should you enqueue data?), and retry behavior (does retrying now add unwanted load to the system?).
Across all of the literature, we couldn’t find a lot of good ‘prior art’ for delivering messages reliably in high-failure environments. The closest thing is , but that discipline has very different strategies concerning buffer allocation (very small) and strategies (adaptive, and usually routing to a single place).
So we decided to build our own general-purpose, fully distributed job scheduler to schedule and execute HTTP requests reliably. We’ve called it Centrifuge. You can think of Centrifuge as the layer that sits between our infrastructure and the outside world–it’s the system responsible for sending data to all of our customers destinations. When third-party APIs fail, Centrifuge is there to absorb the traffic. Under normal operation, Centrifuge has three responsibilities: it delivers messages to third-party endpoints, retries messages upon failure, and archives any undelivered messages. We’ve written this first post as a guide to understand the problems Centrifuge solves, its data model, and the building blocks we’ve used to operate it in production. In subsequent posts, we’ll share how we’ve verified the system’s correctness and made it blindingly fast. Let’s dive in.
The problem with using any sort of queue is that you are fundamentally limited in terms of how you access data. After all, a queue only supports two operations (push and pop).
To see where queues break down, let’s walk through a series of queueing topologies that we’ve implemented at Segment.This works okay for awhile, but what happens when we start seeing a singleendpoint get slow? Unfortunately, it creates backpressure on the entire message flow.
Clearly, this isn’t ideal. If a single endpoint can bring down the entire pipeline, and each endpoint has an hour-long downtime each year (99.9% available), then with 200+ endpoints, we’ll be seeing hour-long outages once per day.
Unfortunately, this approach has problems in practice. If we look at the distribution of messages which should be delivered to a single endpoint, things become a little more nuanced. Segment is a large, multi-tenant system, so some sources of data will generate substantially more load than others. As you might imagine, among our customer base, this follows a fairly consistent power law:
When that translates to messages within our queues, the breakdown looks more like this:
In this case, we have data for customers A, B, and C, all trying to send to the same downstream endpoint. Customer A dominates the load, but B and C have a handful of calls mixed in. Let’s suppose that the API endpoint we are sending to is rated to 1,000 calls per second, per customer. When the endpoint receives more than 1,000 calls in a second for a given customer API key, it will respond with a (rate limit exceeded). Now let’s assume that customer A is trying to send 50,000 messages to the API. Those messages are all ordered contiguously in our queue. At this point we have a few options:
Instead, we want an architecture that looks more like the following diagram, where we have separate queues per combination of customer and endpoint. This architecture gives us much better isolation, as well as the ability to dynamically adjust throughput on a per-customer basis.
However, in a large, multi-tenant system, like Segment, this number of queues becomes difficult to manage. We have hundreds of thousands of these source-destination pairs. Today, we have 42,000 active sources of data sending to an average of 2.1 downstream endpoints. That’s 88,000 total queues that we’d like to support (and we’re growing quickly). To implement per source-destination queues with full isolation, we’d need hundreds of thousands of different queues. Across , , , or –we haven’t seen any queues which support that level of cardinality with simple scaling primitives. is the only queue we’ve found which manages to do this, but is totally cost prohibitive. We need a new primitive to solve this problem of high-cardinality isolation.
We want the ability to quickly recover from errors without having to shuffle terabytes of data around the network. So neither of these approaches works efficiently for us.
Looking back at our requirements, we want a way of quickly altering the delivery order for our jobs, without having to create many copies of the jobs themselves.
A queue won’t solve this problem in-place for us. Our consumer would have to read and then re-write all of the data in our new ordering. But a database, on the other hand, does.
By storing the execution order inside a relational database, we can immediately change the by running a single SQL statement. Similarly, whenever we want to change the delivery semantics for our messages, we don’t have to re-shuffle terabytes of data or double-publish to a new datastore. Instead, we can just deploy a new version of our service, and it can start using the new queries right away. Using a database gives us the flexibility in execution that queues are critically lacking. For that reason, we decided to store Centrifuge data inside Amazon’s instances running on . RDS gives us managed datastores, and MySQL provides us with the ability to re-order our jobs.mysql> describe jobs;+----------------------+----------------+------+-----+---------+-------+| Field | Type | Null | Key | Default | Extra |+----------------------+----------------+------+-----+---------+-------+| id | binary(27) | NO | PRI | NULL | || bucket | varbinary(64) | NO | | NULL | || endpoint | varbinary(255) | NO | | NULL | || headers | mediumblob | NO | | NULL | || payload | mediumblob | NO | | NULL | || execution_timeout_ms | int(11) | NO | | NULL | || backoff_min_delay_ms | int(11) | NO | | NULL | || backoff_coefficient | float | NO | | NULL | || created_at | datetime(6) | NO | | NULL | || expire_at | datetime(6) | NO | | NULL | |+----------------------+----------------+------+-----+---------+-------+
While the endpoint
, payload
, and headers
fields govern message transmission, the expire_at
field is used to indicate when a given job should be archived.
By splitting expire_at
into a separate field, our operations team can easily adjust if we ever need to flush a large number of failing messages to S3, so that we can process them out-of-band.
mysql> show indexes from jobs;+-------+------------+----------+--------------+-------------+-----------+-------------+------------+| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Index_type |+-------+------------+----------+--------------+-------------+-----------+-------------+------------+| jobs | 0 | PRIMARY | 1 | id | A | 2344484 | BTREE | +-------+------------+----------+--------------+-------------+-----------+-------------+------------+
The jobs table primary key is a , which means that our IDs are both are as well as . This allows us to effectively kill two birds with one stone–we can query by a single job ID, as well as sort by the time that the job was created with a single index.
Since the median size of the payload and settings for a single job is about 5kb(and can be as big as 750kb), we’ve done our best to limit reads from and updates to the jobs table.
Under normal operation, the jobs table is immutable and append-only. The golang process responsible for inserting jobs (which we call a Director) keeps a cached version of the payloads and settings in-memory. Most of the time, jobs can be immediately expired from memory after they are delivered, keeping our overall memory footprint low. In production, we set our jobs to expire after 4 hours, with an exponential backoff strategy.
mysql> select id, endpoint, created_at, expire_at from jobs limit 5;+-----------------------------+-------------------------------------------------------+----------------------------+----------------------------+| id | endpoint | created_at | expire_at |+-----------------------------+-------------------------------------------------------+----------------------------+----------------------------+| 14NKRmQSBbCB5p0LAXWRp47dN3F | centrifuge://integrations/v2/54efbf12db31d978f14aa8b5 | 2018-05-09 16:16:52.525000 | 2018-05-09 20:16:52.876976 || 14NKeihjmWdJLpyGi7L7GiJ9UgL | centrifuge://integrations/v2/54521fd725e721e32a72eec6 | 2018-05-09 16:18:34.426000 | 2018-05-09 20:18:35.041901 || 14NL91LEZG694NNQEF3UZMgA9yH | centrifuge://integrations/v2/54521fdc25e721e32a72ef04 | 2018-05-09 16:22:35.723000 | 2018-05-09 20:22:36.339480 || 14NLF682LBV5LQJWLJCwnBUYB8P | centrifuge://integrations/v2/54521fd525e721e32a72ee91 | 2018-05-09 16:23:24.365000 | 2018-05-09 20:23:25.353897 || 14NLQK7R4QfAON8w2pYp1FxkyEe | centrifuge://integrations/v2/54521fd725e721e32a72eec6 | 2018-05-09 16:24:54.317000 | 2018-05-09 20:24:54.857624 |+-----------------------------+-------------------------------------------------------+----------------------------+----------------------------+
Of course, we also want to keep track of what state each job is in, whether it is waiting to be delivered, in the process of executing, or awaiting retry. For that, we use a separate table, the job_state_transitions table.
A job first enters with the awaiting_scheduling
state. It has yet to be executed and delivered to the downstream endpoint.
From there, a job will begin executing
, and the result will transition to one of three separate states.
If the job succeeds (and receives a ), Centrifuge will mark the job as succeeded
. There’s nothing more to be done here, and we can expire it from our in-memory cache.
Similarly, if the job fails (in the case of a ), then Centrifuge will mark the job as discarded
. Even if we try to re-send the same job multiple times, the server will reject it. So we’ve reached another terminal state.
Finally, any jobs which exceed their expiration time transition from awaiting_retry
to archiving
. Once they are successfully stored on S3, the jobs are finally transitioned to a terminal archived
state.
mysql> describe job_state_transitions;+-------------------------+---------------------------------------------------------------------------------------------------------+------+-----+---------+| Field | Type | Null | Key | Default |+-------------------------+---------------------------------------------------------------------------------------------------------+------+-----+---------+| id | bigint(20) | NO | PRI | NULL || job_id | binary(27) | NO | PRI | NULL || time | datetime(6) | NO | | NULL || retry_at | datetime(6) | NO | | NULL || attempts | smallint(6) | NO | | NULL || state | enum('awaiting-scheduling','executing','succeeded','discarded','awaiting-retry','archiving','archived') | NO | | NULL || error_type | varbinary(128) | YES | | NULL || error_response | mediumblob | YES | | NULL || error_response_encoding | varbinary(16) | YES | | NULL |+-------------------------+---------------------------------------------------------------------------------------------------------+------+-----+---------+
Like the jobs table, rows in the job_state_transitions are also immutable and append-only. Every time a new attempt is made, the attempt number is increased. Upon job execution failure, the retry is scheduled with a retry_at
time by the retry behavior specified in the job itself.
mysql> show indexes from job_state_transitions;+-----------------------+------------+----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment |+-----------------------+------------+----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+| job_state_transitions | 0 | PRIMARY | 1 | job_id | A | 5669206 | NULL | NULL | | BTREE | | || job_state_transitions | 0 | PRIMARY | 2 | id | A | 11338413 | NULL | NULL | | BTREE | | |+-----------------------+------------+----------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+
You can see here in one of our production databases that the first index in the sequence is always on the job_id, which is guaranteed to be globally unique. From there, the incrementing ID ensures that each entry in the transitions table for a single job’s execution is sequential.
To give you a flavor of what this looks like in action, here’s a sample execution trace for a single job pulled from production.
mysql> select id, job_id, attempts, state from job_state_transitions limit 7;+--------+-----------------------------+----------+---------------------+| id | job_id | attempts | state |+--------+-----------------------------+----------+---------------------+| 169361 | 14NKRmQSBbCB5p0LAXWRp47dN3F | 0 | awaiting-scheduling || 169362 | 14NKRmQSBbCB5p0LAXWRp47dN3F | 1 | executing || 169363 | 14NKRmQSBbCB5p0LAXWRp47dN3F | 1 | awaiting-retry || 169364 | 14NKRmQSBbCB5p0LAXWRp47dN3F | 2 | executing || 169365 | 14NKRmQSBbCB5p0LAXWRp47dN3F | 2 | awaiting-retry || 169366 | 14NKRmQSBbCB5p0LAXWRp47dN3F | 3 | executing || 169367 | 14NKRmQSBbCB5p0LAXWRp47dN3F | 3 | awaiting-retry |+--------+-----------------------------+----------+---------------------+7 rows in set (0.00 sec)
Notice that the job first starts in the awaiting-scheduling
state before quickly transitioning to its first delivery attempt. From there, the job consistently fails, so it oscillates between executing
and awaiting-retry
.
While this trace is certainly useful for internal debugging, the main benefit it provides is the ability to actually surface the execution path for a given event to the end customer. (Stay tuned for this feature, coming soon!)
Up until this point, we’ve focused exclusively on the data model for our jobs. We’ve shown how they are stored in our RDS instance, and how the jobs
table and jobs_state_transitions
table are both populated.
Traditionally, web-services have many readers and writers interacting with a single, centralized database. There is a stateless application tier, which is backed by any number of sharded databases.
Remember though, that Segment’s workload looks very different than a traditional web-service. Our workload is extremely write-heavy, has no reads, and requires no JOINs or query coordination across separate jobs. Instead, our goal is to minimize the contention between separate writers to keep the writes as fast as possible. To do that, we’ve adopted an architecture where a single Director interacts with a given database. The Director manages all of its caching, locks, and querying in-process. Because the Director is the sole writer, it can manage all of its with zero-coordination. The only thing a Director needs to globally coordinate is to which particular database it is writing. We call the attached database a JobDB, and what follows is a view into the architecture for how Directors coordinate to acquire and send messages to a JobDB.
When a Director first boots up, it follows the following lifecycle: Acquire a spare JobDB via Consul — to begin operating; a Director first does a lookup and acquires a on the key for a given JobDB. If another Director already holds the lock, the current Director will retry until it finds an available spare JobDB. Consul sessions ensure that a given database is never concurrently written to by multiple Directors. They are mutually exclusive and held by a single writer. Sessions also allow us to lock an entire keyspace so that a director can freely update the status for the JobDB in Consul while it continues to hold the lock.
import ( ... "github.com/segmentio/consul-go" )
// AcquireLock satisfies the centrifuge.Registry interface.func (r *Registry) AcquireLock(ctx context.Context, locks ...string) (context.Context, context.CancelFunc) { lockKeys := make([]string, len(locks)) for i, lock := range locks { lockKeys[i] = r.lockKey(lock) } sessionCtx, cancel := consul.WithSession(ctx, consul.Session{ Name: "centrifuge", Behavior: consul.Delete, LockDelay: r.lockDelay, TTL: r.lockTimeout, }) lockCtx, unlock := r.locker.TryLockOne(sessionCtx, lockKeys...) if lockCtx.Err() != nil { return lockCtx, func() { unlock(); cancel() } } acquired := lockCtx.Value(consul.LocksKey).([]string)[0] for i, lockKey := range lockKeys { if lockKey == acquired { return context.WithValue(lockCtx, centrifuge.LockKey, locks[i]), func() { unlock(); cancel() } } } unlock() cancel() panic(fmt.Sprintf("BUG: the lock key acquired by the consul client was not found in the set of lock keys passed to TryLockOne (acquired lock = %s, candidates = %v)", acquired, lockKeys))}
Connect to the JobDB, and create new tables — once a Director has connected to a spare JobDB, it needs to create the necessary tables within the connected DB.
Rather than use an ORM layer, we’ve used the standard golang interface, backed by the implementation. Many of these queries and prepared statements are generated via , but a handful are handwritten.
Begin listening for new jobs and register itself in Consul — after the Director has finished creating the necessary tables, it registers itself in Consul so that clients may start sending the Director traffic.
Start executing jobs — once the Director is fully running, it begins accepting jobs. Those jobs are first logged to the paired JobDB; then the Director begins delivering each job to its specified endpoint.
Now that we understand the relationship between Directors and JobDBs, we can look back at the properties of the system (immutable, extremely write-heavy with a small working set, no database JOINs), and understand how Centrifuge is able to quickly absorb traffic.
Under normal operation, the Director rarely has to read from the attached JobDB. Because all jobs are immutable and the Director is the sole writer, it can cache all jobs in-memory and expire them immediately once they are delivered. The only time it needs to read from the database is when recovering from a failure.
Looking at the , we can see that a significant proportion of heap objects do indeed fall into the category of cached jobs:
And thanks to the cache, our writes dominate our reads. Here’s the example metrics that we pulled from a single active database.
Since all jobs are extremely short-lived (typically only a few hundred milliseconds while it is being executed), we can quickly expire delivered jobs from our cache.
First, the Director is responsible for accepting new jobs via RPC. When it receives the RPC request, it will go ahead and log those jobs to the attached JobDB, and respond with a transaction ID once the jobs have been successfully persisted. From there, the Director makes requests to all of the specified endpoints, retrying jobs where necessary, and logging all state transitions to the JobDB. If the Director fails to deliver any jobs after their expiration time (4 hours in our case), they are archived on S3 to be re-processed later.
Like all of our other services at Segment, the Directors scale themselves up and down based upon CPU usage. If our system starts running under load, will add Directors. If we are over capacity, ECS removes them. However, Centrifuge created an interesting new motion for us. We needed to appropriately scale our storage layer (individual JobDBs) up and down to match the scaling in our compute layer (instances of Director containers). To do that, we created a separate binary called the JobDB Manager. The Manager’s job is to constantly adjust the number of databases to match the number of Directors. It keeps a pool of ‘spare’ databases around in case we need to scale up suddenly. And it will retire old databases during off-peak hours. To keep the ‘small working set’ even smaller, we cycle these JobDBs roughly every 30 minutes. The manager cycles JobDBs when their target of filled percentage data is about to exceed available RAM.
This cycling of databases ensures that no single database is slowing down because it has to keep growing its memory outside of RAM.
Instead of issuing a large number of random deletes, we end up batching the deletes into a single drop table
for better performance. And if a Director exits and has to restart, it must only read a small amount of data from the JobDB into memory.
Of course, seeing the system operate at ‘steady-state’ isn’t really the most interesting part of Centrifuge. It’s designed to absorb traffic in high-load failure scenarios. We had tested many of these scenarios in a staging account, but had yet to really see a third-party outage happen in production. One month after the full rollout, we finally got to observe the system operating in a high-failure state. At 4:45 pm on March 17th, one of our more popular integrations started reporting high latencies and elevated 500s. Under normal operation, this API receives 16,000 requests per second, which is a fairly significant portion of our outbound traffic load. From 4:45pm until 6:30pm, our monitoring saw a sharp decline and steeply degraded performance. The percentage of successful calls dropped to about 15% of normal traffic load. Here you can see the graph of successful calls in dark red, plotted against the data from one week before as the dashed thin line.
During this time, Centrifuge began to rapidly retry the failed requests. Our exponential backoff strategy started to kick in, and we started attempting to re-send any requests which had failed. Here you can see the request volume to the third-party’s endpoint. Admittedly this strategy still needs some tuning–at peak, we were sending around 100,000 requests per second to the partner’s API.
You can see the requests rapidly start retrying over the first few minutes, but then smooth out as they hit their exponential backoff period. This outage was the first time we’d really demonstrated the true power of Centrifuge. Over a 90-minute period, we managed to absorb about 85 million analytics events in Segment’s infrastructure. In the subsequent 30 minutes after the outage, the system successfully delivered all of the queued traffic. Watching the event was incredibly validating. The system worked as anticipated: it scaled up, absorbed the load, and then flushed it once the API had recovered. Even better, our mutual customers barely noticed. A handful saw delays in delivering their data to the third-party tool, but none saw data loss. Best of all, this single outage didn’t affect data delivery for any other integrations we support! All told, there’s a lot more we could say about Centrifuge. Which is why we’re saving a number of the implementation details around it for further posts. In our next posts in the series, we plan to share:
Sept 22, 2017: Achille gets some exciting ideas for cycling databases. Feverish whiteboarding ensues.
January 12, 2018: we hit a major milestone of 70% traffic flowing through the system. Tom bows for the camera.
Mar 14, 2018: We hit a new load test record of 2M messages per second in our “Black Hole” account.
May 22, 2018: Tom, Calvin, Alexandra, and Max take a group picture, since we forgot to earlier. Rick and Achille are traveling
This post originally appeared on the and explores how Segment reliably sends billions of messages per day to hundreds of public APIs, as well as the data model used to run it in production. , Hacker Noon’s weekly sponsor, is currently offering a 90 day free trial — email friends@segment and mention Hacker Noon to redeem.