Skip to content

Latest commit

 

History

History
150 lines (107 loc) · 7.17 KB

ch05Bolts.asc

File metadata and controls

150 lines (107 loc) · 7.17 KB

Bolts

As we have seen, bolts are key components in a Storm cluster. In this chapter, we’ll look at a bolt’s life cycle, some strategies for bolt design and some examples of how to implement them.

Bolt Lifecycle

A bolt is a component that takes tuples as input and produces tuples as output. When writing a bolt, you will usually extend the BaseRichBolt abstract class. Bolts are created on the client machine, serialized into the topology and submitted to the master machine of the cluster. The cluster launches workers that deserialize the bolt, call prepare on it, and then start processing tuples.

Tip
To customize a bolt you should set parameters in its constructor and save them as instance variables, so they will be serialized when submitting the bolt to the cluster.

Bolt Structure

Bolts have the following methods:

  • declareOutputFields(OutputFieldsDeclarer declarer): Declare the output schema for this bolt.

  • prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector): Called just before the bolt starts processing tuples.

  • execute(Tuple input): Process a single tuple of input.

  • cleanup(): Called when a bolt is going to shutdown. This is not guaranteed to be called on the cluster, as the process could be killed.

Let’s take a look at an example of a bolt that will split sentences into words.

class SplitSentence extends BaseRichBolt {
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
        OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
        collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

As you can see, this bolt is very straightforward. It’s worth mentioning that in this example there is no message guarantee. This means that if the bolt discards a message for some reason, either because it goes down or because it was deliberately discarded programmatically, the spout that generated the message will never be notified, and neither will any of the bolts and spouts in between.

It is important to mention that every tuple you process must be acked or failed, using ack or fail methods. Storm uses memory to track each tuple, so if you don’t ack/fail every tuple, the task will eventually run out of memory.

In many cases, you’ll want to guarantee message processing through the entire topology.

Reliable vs Unreliable Bolts

As we said before, Storm can guarantee that each message, sent by a spout, will be fully processed by all bolts. This is a design consideration, meaning that you will need to decide whether you want to provide that or not.

A topology is a tree of nodes in which messages (tuples) travel along one or more branches. Each node will ack(tuple) or fail(tuple) so that Storm knows when a message fails, and notifies the spout or spouts that produced the message.

When a tuple comes off of a spout it is emitted to the SpoutOutputCollector with a message id. For example:

collector.emit(new Values("field1", "field2", 3) , msgId);

Next, the tuple gets sent to consuming bolts and Storm takes care of tracking the tree of messages that is created. When Storm detects that a tuple was fully processed, it will call the ack method on the originating spout task with the message id that the spout provided to Storm. On the other hand, if the tuple times-out storm will call the fail method on the spout.

To benefit from Storm reliability capabilities, first you need to tell Storm when you are creating a new link in the tree of tuples. Second, you need to tell Storm either if you finished processing a tuple or failed doing it. Storm will detect when the tree of tuples is fully processed and will ack or fail the spout tuple appropriately.

When you specify a link in the tuple tree, you are Anchoring the emitted tuples to the origin tuple.

Let’s change the SplitSentence bolt, that we just saw, so that it guarantees message processing.

class SplitSentence extends BaseRichBolt {
    private OutputCollector collector;

    public void prepare(Map stormConf, TopologyContext context,
        OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

The exact line where the anchoring happens is collector.emit(tuple, new Values(word));. As we mentioned above, passing along the tuple enables Storm to keep track of the originating spout. collector.ack(tuple) and collector.fail(tuple) tell Storm what happened to each tuple, and Storm can tell to the spout to ack or fail. Storm considers a tuple coming of a spout fully processed when every message in the tree has been processed. And a tuple is considered failed when its tree of messages fails to be fully processed within a configurable timeout. The default is 30 seconds.

Tip
You can change this timeout changing the Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS configuration on the topology.

Of course, the spout needs to take care of the case when a message fails and retry or discard the message accordingly.

Multiple Anchoring

To use a bolt to join or aggregate streams you’ll need to buffer tuples in memory. In order to message guarantee, in this scenario, you have to anchor the stream to more than one tuple. This is done by calling emit with a List of tuples.

...
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, values);
...

That way, any time a bolt acks or fails, it notifies the root tuple of each tuple tree.

Multiple streams

A bolt can emit tuples to multiple streams using emit(streamId, tuple), where streamId is a string that identifies the stream. Then in the TopologyBuilder you can decide which stream to subscribe to.

Using BaseBasicBolt to do acking automatically

As you probably noticed, there are lots of use cases in which you need message guarantees. To make things easier, Storm provides another abstract class for bolts called BaseBasicBolt, which encapsulates the pattern of calling ack right after the execute method.

class SplitSentence extends BaseBasicBolt {

    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for (String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
Tip
Tuples emitted to BasicOutputCollector are automatically anchored to the input tuple.