visit
Secondly, very often we don't need to deduce patterns over long periods of time. Of the petabytes of incoming data collected over months, at any given moment, we might not need to take into account all of it, just a real-time snapshot. Perhaps we don't need to know the longest trending hashtag over five years, but just the one right now.
This is what Storm is built for, to accept tons of data coming in extremely fast, possibly from various sources, analyze it and publish the real-time updates to a UI or some other place without storing any itself.
This article is not the be-all and end-all of nor is it meant to be. Storm's pretty huge, and just one long-read probably can't do it justice anyways. Of course, any additions, feedback or constructive criticism will be greatly appreciated.
The architecture of Storm can be compared to a network of roads connecting a set of checkpoints. Traffic begins at a certain checkpoint (called a spout) and passes through other checkpoints (called bolts).
The traffic is of course the stream of data that is retrieved by the spout (from a data source, a public API for example) and routed to various bolts where the data is filtered, sanitized, aggregated, analyzed, sent to a UI for people to view or any other target.
The network of spouts and bolts is called a topology, and the data flows in the form of tuples (list of values that may have different types).
One important thing to talk about is the direction of the data traffic. Conventionally, we would have one or multiple spouts reading the data from an API, a Kafka topic or some other queuing system. The data would then flow one-way to one or multiple bolts which may forward it to other bolts and so on.
Bolts may publish the analyzed data to a UI or to another bolt. But the traffic is almost always unidirectional, like a DAG. Although it is certainly possible to make cycles, we're unlikely to need such a convoluted topology.
involves a number of steps, which you're free to follow on your machine. But later on I'll be using Docker containers for a Storm cluster deployment and the images will take care of setting up everything we need.public class RandomDigitSpout extends BaseRichSpout
{
// To output tuples from spout to the next stage bolt
SpoutOutputCollector collector;
public void nextTuple()
{
int randomDigit = ThreadLocalRandom.current().nextInt(0, 10);
// Emit the digit to the next stage bolt
collector.emit(new Values(randomDigit));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer)
{
// Tell Storm the schema of the output tuple for this spout.
// It consists of a single column called 'random-digit'.
outputFieldsDeclarer.declare(new Fields("random-digit"));
}
}
public class EvenDigitBolt extends BaseRichBolt
{
// To output tuples from this bolt to the next bolt.
OutputCollector collector;
public void execute(Tuple tuple)
{
// Get the 1st column 'random-digit' from the tuple
int randomDigit = tuple.getInt(0);
if (randomDigit % 2 == 0) {
collector.emit(new Values(randomDigit));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
// Tell Storm the schema of the output tuple for this bolt.
// It consists of a single column called 'even-digit'
declarer.declare(new Fields("even-digit"));
}
}
Another simple bolt that'll receive the filtered stream from EvenDigitBolt, and just multiply each even digit by 10 and emit it forward:
public class MultiplyByTenBolt extends BaseRichBolt
{
OutputCollector collector;
public void execute(Tuple tuple)
{
// Get 'even-digit' from the tuple.
int evenDigit = tuple.getInt(0);
collector.emit(new Values(evenDigit * 10));
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
declarer.declare(new Fields("even-digit-multiplied-by-ten"));
}
}
package packagename
// ...
public class OurSimpleTopology {
public static void main(String[] args) throws Exception
{
// Create the topology
TopologyBuilder builder = new TopologyBuilder();
// Attach the random digit spout to the topology.
// Use just 1 thread for the spout.
builder.setSpout("random-digit-spout", new RandomDigitSpout());
// Connect the even digit bolt to our spout.
// The bolt will use 2 threads and the digits will be randomly
// shuffled/distributed among the 2 threads.
// The third parameter is formally called the parallelism hint.
builder.setBolt("even-digit-bolt", new EvenDigitBolt(), 2)
.shuffleGrouping("random-digit-spout");
// Connect the multiply-by-10 bolt to our even digit bolt.
// This bolt will use 4 threads, among which data from the
// even digit bolt will be shuffled/distributed randomly.
builder.setBolt("multiplied-by-ten-bolt", new MultiplyByTenBolt(), 4)
.shuffleGrouping("even-digit-bolt");
// Create a configuration object.
Config conf = new Config();
// The number of independent JVM processes this topology will use.
conf.setNumWorkers(2);
// Submit our topology with the configuration.
StormSubmitter.submitTopology("our-simple-topology", conf, builder.createTopology());
}
}
In our example, RandomDigitSpout will launch just one thread, and the data spewed from that thread will be distributed among 2 threads of the EvenDigitBolt.
But the way this distribution happens, referred to as the stream grouping, can be important. For example you may have a stream of temperature recordings from two cities, where the tuples emitted by the spout look like this:
// City name, temperature, time of recording
("Atlanta", 94, "2018-05-11 23:14")
("New York City", 75, "2018-05-11 23:15")
("New York City", 76, "2018-05-11 23:16")
("Atlanta", 96, "2018-05-11 23:15")
("New York City", 77, "2018-05-11 23:17")
("Atlanta", 95, "2018-05-11 23:16")
("New York City", 76, "2018-05-11 23:18")
A fields grouping would serve our purpose, which partitions data among the threads by the value of the field specified in the grouping:
// The tuples with the same city name will go to the same thread.
builder.setBolt("avg-temp-bolt", new AvgTempBolt(), 2)
.fieldsGrouping("temp-spout", new Fields("city_name"));
And of course there are . For most cases, though, the grouping probably won't matter much and you can just shuffle the data and throw it among the bolt threads randomly (shuffle grouping).
Now there's another important component to this: the number of worker processes that our topology will run on. The total number of threads that we specified will then be equally divided among the worker processes.
So in our example random digit topology we had 1 spout thread, 2 even-digit bolt threads and 4 multiply-by-ten bolt threads (7 total). Each of the 2 worker processes would be responsible for running 2 multiply-by-ten bolt threads, 1 even-digit bolt and one of the processes will run the 1 spout thread.
Of course, the 2 worker processes will have their main threads, which in turn will launch the spout and bolt threads. So all in all we'll have 9 threads. These are collectively called executors.
It's important to realize that if you set a spout's parallelism hint > 1 (i.e. multiple executors), you can end up emitting the same data several times. Say, the spout reads from the public Twitter stream API and uses two executors. That means that the bolts receiving the data from the spout will get the same tweet twice. It is only after the spout emits the tuples that data parallelism comes into play, i.e. after the tuples get divided among the bolts according to the specified stream grouping.
Running multiple workers on a single node would be fairly pointless. Later, however, we'll use a proper, distributed, multi-node cluster and see how workers are divided on different nodes.yourproject/
pom.xml
src/
jvm/
packagename/
RandomDigitSpout.java
EvenDigitBolt.java
MultiplyByTenBolt.java
OurSimpleTopology.java
is commonly used for building Storm topologies, and it requires a pom.xml file (The POM) that . Getting into the nitty-gritty of the POM will probably be an overkill here.
mvn clean
inside yourproject to clear any compiled files we may have, making sure to compile each module from scratch.mvn package
to compile our code and package it in an executable JAR file, inside a newly created target folder. This might take quite a few minutes the first time, especially if your topology has many dependencies.storm jar target/packagename-{version number}.jar packagename.OurSimpleTopology
The master node runs the Storm daemon and the Storm UI. The slave nodes run the Storm daemons. A daemon on a separate node is used for coordination among the master node and the slave nodes. Zookeeper, by the way, is only used for cluster management and never any kind of message passing.
It's not like spouts and bolts are sending data to each other through it or anything like that. The Nimbus daemon finds available Supervisors via ZooKeeper, to which the Supervisor daemons register themselves. And other managerial tasks, some of which will become clear shortly.Our topology is submitted to the Nimbus daemon on the master node and then distributed among the worker processes running on the slave/supervisor nodes. Because of Zookeeper, it doesn't matter how many slave/supervisor nodes you run initially, as you can always seamlessly add more and Storm will automatically integrate them into the cluster.
Whenever we start a Supervisor it allocates a certain number of worker processes (that we can configure) which can then be used by the submitted topology. So in the image above there are a total of 5 allocated workers. And remember this line?
conf.setNumWorkers(5)
This means that the topology will try to use a total of 5 workers. And since our two Supervisor nodes have a total of 5 allocated workers: each of the 5 allocated worker processes will run one instance of the topology. If we had done:
conf.setNumWorkers(4)
then one worker process would have remained idle/unused. If the number of specified workers was 6 and the total allocated workers were 5, then because of the limitation only 5 actual topology workers would've been functional.
Before we set this all up using Docker, a few important things to keep in mind regarding fault-tolerance:Note: In most Storm clusters, the Nimbus itself is never deployed as a single instance but as a cluster. If this fault-tolerance is not incorporated and our sole Nimbus goes down, . For simplicity, our illustrative cluster will use a single instance. Similarly, the Zookeeper is very often deployed as a cluster but we'll use just one.
zookeeper/
Dockerfile
storm-nimbus/
Dockerfile
storm.yaml
code/
pom.xml
src/
jvm/
coincident_hashtags/
ExclamationTopology.java
storm-supervisor/
Dockerfile
storm.yaml
docker-compose.yml
And our docker-compose.yml:
version: '3.2'
services:
zookeeper:
build: ./zookeeper
# Keep it running.
tty: true
storm-nimbus:
build: ./storm-nimbus
# Run this service after 'zookeeper' and make 'zookeeper' reference.
links:
- zookeeper
tty: true
# Map port 8080 of the host machine to 8080 of the container.
# To access the Storm UI from our host machine.
ports:
- 8080:8080
volumes:
- './storm-nimbus:/theproject'
storm-supervisor:
build: ./storm-supervisor
links:
- zookeeper
- storm-nimbus
tty: true
# Host volume used to store our code on the master node (Nimbus).
volumes:
storm-nimbus:
The storm.yaml files override certain default configurations for the Storm installations. The line ADD storm.yaml /conf inside the Nimbus and Supervisor Dockerfiles puts them inside the containers where Storm can read them.
storm-nimbus/storm.yaml:
# The Nimbus needs to know where the Zookeeper is. This specifies the list of the
# hosts in the Zookeeper cluster. We're using just one node, of course.
# 'zookeeper' is the Docker Compose network reference.
storm.zookeeper.servers:
- "zookeeper"
storm-supervisor/storm.yaml:
# Telling the Supervisor where the Zookeeper is.
storm.zookeeper.servers:
- "zookeeper"
# The worker nodes need to know which machine(s) are the candidate of master
# in order to download the topology jars.
nimbus.seeds : ["storm-nimbus"]
# For each Supervisor, we configure how many workers run on that machine.
# Each worker uses a single port for receiving messages, and this setting
# defines which ports are open for use. We define four ports here, so Storm will
# allocate up to four workers to run on this node.
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
docker-compose up
After all the images have been built and all the service started, open a new terminal, type
docker ps
and you'll see something like this:docker exec -it coincidenthashtagswithapachestorm_storm-nimbus_1 bash
storm nimbus
Similarly, open another terminal, SSH into the Nimbus again and launch the UI using
storm ui
:Go to localhost:8080 on your browser and you'll see a nice overview of our cluster:
The Free slots in the Cluster Summary indicate how many total workers (on all Supervisor nodes) are available & waiting for a topology to consume them. Used Slots indicate how many of the total are currently busy with a topology.
Since we haven't launched any Supervisors yet, they're both zero. We'll get to Executors and Tasks later. Also, as we can see, no topologies have been submitted yet.
docker exec -it coincidenthashtagswithapachestorm_storm-supervisor_1 bash
storm supervisor
Note: Any changes in our cluster may take a few seconds to reflect on the UI.
We have a new running Supervisor which comes with four allocated workers. These four workers are the result of specifying four ports in our storm.yaml for the Supervisor node. Of course, they're all free (four Free slots). Let's submit a topology to the Nimbus and put them to work.
SSH into the Nimbus on a new terminal. I've written the so that we land on our working (landing) directory /theproject. Inside this is code, where our topology resides. .
It uses a spout that generates random words and a bolt that just appends three exclamation marks to the words. Two of these bolts are added back-to-back and so at the end of the stream we'll get words with six exclamation marks. It also specifies that it needs three workers ().public static void main(String[] args) throws Exception
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
Config conf = new Config();
// Turn on debugging mode
conf.setDebug(true);
conf.setNumWorkers(3);
StormSubmitter.submitTopology("exclamation-topology", conf, builder.createTopology());
}
cd code
mvn clean
mvn package
storm jar target/coincident-hashtags-1.2.1.jar coincident_hashtags.ExclamationTopology
And 10 word spout threads + 3 exclaim1 bolt threads + 2 exclaim bolt threads + the 3 main threads from the workers = total of 18 executors. And you might've noticed something new: tasks.
// Each of the two executors (threads) of this bolt will instantiate
// two objects of this bolt (total 4 bolt objects/tasks).
builder.setBolt("even-digit-bolt", new EvenDigitBolt(), 2)
.setNumTasks(4)
.shuffleGrouping("random-digit-spout");
Anyways, returning from that slight detour, let's see an overview of our topology. Click on the name under Topology Summary and scroll down to Worker Resources:
We can clearly see the division of our executors (threads) among the 3 workers. And of course all the 3 workers are on the same, single Supervisor node we're running.
Now, let's say scale out!
docker-compose scale storm-supervisor=2
docker exec -it coincidenthashtagswithapachestorm_storm-supervisor_2 bash
storm supervisor
We see two unique Supervisor IDs, both running on different nodes, and all our executors pretty evenly divided among them. This is great. But Storm comes with another nifty way of doing so while the topology is running. Something called rebalancing. On the Nimbus we'd run:
storm rebalance exclamation-topology -n 6
storm rebalance exclamation-topology -e even-digit-bolt=3
One question we haven't tackled is about what happens if a bolt fails to process a tuple. Well, Storm provides us a mechanism using which the originating spout (specifically the task) can replay the failed tuple. This processing guarantee doesn't just happen by itself, it's a conscious design choice and does add latency.
Spouts send out tuples to bolts, which emit tuples derived from the input tuples to other bolts and so on. That one, original tuple spurs an entire tree of tuples. If any child tuple, so to speak, of the original one fails then any remedial steps (rollbacks etc) may well have to be taken at multiple bolts. That could get pretty hairy, and so what Storm does is that it allows the original tuple to be emitted again right from the source (the spout).Consequentially, any operations performed by bolts that are a function of the incoming tuples should be idempotent. A tuple is considered "fully processed" when every tuple in its tree has been processed, and every tuple has to be explicitly acknowledged by the bolts. However, that's not all. There's another thing to be done explicitly: maintain a link between the original tuple and its child tuples.
Storm will then be able to trace the origin of the child tuples and thus be able to replay the original tuple. This is called anchoring. :
// ExclamationBolt
// 'tuple' is the original one received from the test word spout.
// It's been anchored to/with the tuple going out.
_collector.emit(tuple, new Values(exclamatedWord.toString()));
// Explicitly acknowledge that the tuple has been processed.
_collector.ack(tuple);
The ack call will result in the ack method on the spout being called, if it has been implemented. So, say, you're reading the tuple data from some queue and you can only take it off the queue if the tuple has been fully processed. Well, the ack method is where you'd do that. You can also emit out tuples without anchoring:
_collector.emit(new Values(exclamatedWord.toString()))
In both these cases, the fail method on the spout will be called, if it is implemented. And if we want the tuple to be replayed, it would have to be done explicitly in the fail method by calling emit, just like in nextTuple(). When tracking tuples, every one has to be acked or failed. Otherwise, the topology will eventually run out of memory.
It's also important to know that you have to do all of this yourself when writing custom spouts and bolts. But the Storm core can help. For example, a bolt implementing does acking automatically. Or built-in spouts for popular data sources like take care of queuing and replay logic after acknowledgment and failure.Define your bolts logically, one per indivisible task, and keep them light and efficient. Similarly, your spouts' nextTuple() methods should be optimized.
Use the Storm UI effectively. By default it doesn't show us the complete picture, only 5% of the total tuples emitted. To monitor all of them use config.setStatsSampleRate(1.0d). Keep an eye on the Acks and Latency values for individual bolts and topologies via the UI, that's what you want to look at when turning the knobs.
Previously published at