Skip to content

Latest commit

 

History

History
430 lines (334 loc) · 13.3 KB

ch07NonJVMLanguages.asc

File metadata and controls

430 lines (334 loc) · 13.3 KB

Using non-JVM languages with Storm

Sometimes we want to use languages that aren’t based on the JVM to implement a Storm project, either because we feel more comfortable with another language, or to be able to use a library written in that language.

Storm is implemented in Java, and all the spouts and bolts that you’ve seen in this book were written in Java as well. So is it possible to use languages like Python, Ruby or even Javascript to write spouts and bolts?

The answer to this question is yes! It is possible using something called multilang protocol.

The multilang protocol is a special protocol implemented in Storm, that uses standard input and standard output as a channel of communication with a process that does the job of a spout or a bolt. Messages are passed through this channel either JSON encoded or as lines of plain text.

Let’s take a look at a simple example of a spout and a bolt in a non-JVM language. We’ll have a spout that generates numbers from 1 to 10,000 and a bolt that filters for prime numbers, both written in PHP.

Tip
In this example we check for prime numbers in a naive way. There are much better implementations, but they are also more complex and out of the scope of this example.
Tip
There is an official implementation of a PHP DSL for Storm. In this chapter we’ll show our implementation as an example.

First of all we define our topology.

...
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("numbers-generator", new NumberGeneratorSpout(1, 10000));
builder.setBolt("prime-numbers-filter", new PrimeNumbersFilterBolt()).shuffleGrouping("numbers-generator");
StormTopology topology = builder.createTopology();
...
Tip
There is a way to specify topologies in a non-JVM language. Since Storm topologies are just Thrift structures, and Nimbus is a Thrift daemon, you can create and submit topologies in any language you want. But this it out of the scope of this book.

Nothing new here. Lets see the implementation of NumbersGeneratorSpout.

public class NumberGeneratorSpout extends ShellSpout implements IRichSpout {
    public NumberGeneratorSpout(Integer from, Integer to) {
    super("php", "-f", "NumberGeneratorSpout.php", from.toString(), to
        .toString());
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("number"));
    }

    public Map<String, Object> getComponentConfiguration() {
    return null;
    }
}

As you have probably noticed, this spout extends ShellSpout. This is a special class that comes with Storm, and help us run and control spouts written in other languages. In this case it tells Storm how to execute our PHP script.

The NumberGeneratorSpout PHP script emits tuples to the standard output, and reads standard input to process acks or fails.

Before going over the implementation of the NumberGeneratorSpout.php script, let’s look in more detail at how the multilang protocol works.

The spout generates sequential numbers counting from the from parameter up to the to parameter, passed to the constructor.

Next, let’s look at PrimeNumbersFilterBolt. This class implements the shell we mentioned above. It tells Storm how to execute our PHP script. Storm provides a special class for this purpose called ShellBolt, where the only thing we have to do is to indicate how to run the script and declare the fields that it emits.

public class PrimeNumbersFilterBolt extends ShellBolt implements IRichBolt {
    public PrimeNumbersFilterBolt() {
        super("php", "-f", "PrimeNumbersFilterBolt.php");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("number"));
    }
}

In the constructor we just tell Storm how to run the PHP script. This is the equivalent of the following bash command:

php -f PrimeNumbersFilterBolt.php

The PrimeNumbersFilterBolt PHP script reads tuples from standard input, processes them, and emits, acks or fails to standard output.

Before going over the implementation of the PrimeNumbersFilterBolt.php script, let’s look in more detail at how the multilang protocol works.

Multilang Protocol Specification

The protocol relies on standard input and standard output as a channel of communication between processes. There is a list of steps a script needs to follow in order to work.

  1. Initial handshake

  2. Start looping, and read or write tuples.

Tip
There is a special way of logging from your script that uses Storm’s built-in logging mechanism, so you don’t need to implement your own logging system.

Let’s take a look at the detail of each of these steps, and how to implement it using a PHP script.

Initial handshake

In order to be able to control the process (to start and stop it), Storm needs to know the process ID (PID) of the script it is executing. According to the multilang protocol, the first thing that will happen when our process starts, is that Storm will send a JSON object with storm configuration, topology context, and a PID directory to standard input. Something like this:

{
    "conf": {
        "topology.message.timeout.secs": 3,
        // etc
    },
    "context": {
        "task->component": {
            "1": "example-spout",
            "2": "__acker",
            "3": "example-bolt"
        },
        "taskid": 3
    },
    "pidDir": "..."
}

The process must create an empty file at the path specified by pidDir, whose name is the process ID, and write the PID to standard out as a JSON object.

{"pid": 1234}

So for example, if we receive /tmp/example\n and the PID of our script is 123, we should create an empty file at /tmp/example/123, and print the lines {"pid": 123}\n and end\n to standard output. This is how Storm keeps track of the PID and kills the process when it shuts down. Let’s see how to do it in PHP.

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];

$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

We’ve created a function called read_msg to handle reading messages from standard input. The multilang protocol states that messages can be either a single line, or multiple lines encoded in JSON. A message is complete when Storm sends a single line with the word end\n.

function read_msg() {
    $msg = "";
    while(true) {
    	$l = fgets(STDIN);
        $line = substr($l,0,-1);
        if($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}

function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}

function write_line($line) {
    echo("$line\n");
}
Tip
The use of flush() is very important as there might be a buffer that won’t be flushed until a specific amount of characters have been accumulated. This mean that your script can hang forever waiting for an input from Storm, which it will never receive, because Storm is in turn waiting on output from your script. So it’s important to make sure that when your script outputs something it gets flushed immediately.

Start looping, and read or write tuples.

This is the most important step, where all the work gets done. Also the implementation of this step depends on if we are developing a spout or a bolt.

In case of a spout, we should start emiting tuples. In case of a bolt we loop and read tuples, processes them and emit, ack or fail.

Lets see the implementation of the spout that emits numbers.

$from = intval($argv[1]);
$to = intval($argv[2]);

while(true) {
        $msg = read_msg();

        $cmd = json_decode($msg, true);
        if($cmd['command']=='next') {
            if($from<$to) {
                storm_emit(array("$from"));
                $task_ids = read_msg();
                $from++;
            } else {
                sleep(1);
            }
        }
        storm_sync();
}

We get the from and to from the command line arguments, and then start iterating. Everytime we get a next message from Storm, it means we are ready to emit a new tuple.

Once we’ve sent all the numbers, and we don’t have more tuples to send, we just sleep for some time.

In order to make sure the script is ready for the next tuple, Storm waits for the line sync\n before sending the next one. To read a command we just call read_msg() and JSON decode it.

In the case of bolts, this is a little different.

while(true) {
        $msg = read_msg();
        $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
        if(!empty($tuple["id"])) {
            if(isPrime($tuple["tuple"][0])) {
                storm_emit(array($tuple["tuple"][0]));
            }
            storm_ack($tuple["id"]);
        }
}

We loop, reading tuples from standard input. As soon as we get a message, we json decode it. If it is a tuple, we process it, checking if it is a prime number.

In case it is a prime number, we emit that number, otherwise we just ignore it.

In any case we ack the tuple.

Tip
The use of JSON_BIGINT_AS_STRING in the json_decode function is a workaround for a conversion problem between Java and PHP. Java sends some very big numbers, and in PHP they get decoded with less precision, which can cause problems. To work around this problem, we tell PHP to decode big numbers as strings, and avoid using double quotes when printing numbers in JSON messages. Note that PHP 5.4.0 or higher is required for this parameter to work.

Messages like emit, ack, fail and log have the following structure:

Emit
{
    "command": "emit",
    "tuple": ["foo", "bar"]
}

Where the array has the values we are emitting for the tuple.

Ack
{
    "command": "ack",
    "id": 123456789
}

Where the id is the ID of the tuple we are processing.

Fail
{
    "command": "fail",
    "id": 123456789
}

Same as emit, the id is the ID of the tuple we are processing.

Log
{
    "command": "log",
    "msg": "some message to be logged by storm."
}

Putting it all together gives us the following PHP scripts.

For our spout:

<?php
function read_msg() {
    $msg = "";
    while(true) {
    	$l = fgets(STDIN);
        $line = substr($l,0,-1);
        if($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}

function write_line($line) {
    echo("$line\n");
}

function storm_emit($tuple) {
    $msg = array("command" => "emit", "tuple" => $tuple);
    storm_send($msg);
}

function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}

function storm_sync() {
    storm_send(array("command" => "sync"));
}

function storm_log($msg) {
    $msg = array("command" => "log", "msg" => $msg);
    storm_send($msg);
    flush();
}

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];

$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

$from = intval($argv[1]);
$to = intval($argv[2]);

while(true) {
        $msg = read_msg();

        $cmd = json_decode($msg, true);
        if($cmd['command']=='next') {
            if($from<$to) {
                storm_emit(array("$from"));
                $task_ids = read_msg();
                $from++;
            } else {
                sleep(1);
            }
        }
        storm_sync();
}
?>

And for our bolt:

<?php
function isPrime($number) {
    if ($number < 2) {
        return false;
    }
    if ($number==2) {
        return true;
    }
    for ($i=2; $i<=$number-1; $i++) {
        if($number % $i == 0) {
            return false;
        }
    }
    return true;
}
function read_msg() {
    $msg = "";
    while(true) {
    	$l = fgets(STDIN);
        $line = substr($l,0,-1);
        if($line=="end") {
            break;
        }
        $msg = "$msg$line\n";
    }
    return substr($msg, 0, -1);
}

function write_line($line) {
    echo("$line\n");
}

function storm_emit($tuple) {
    $msg = array("command" => "emit", "tuple" => $tuple);
    storm_send($msg);
}

function storm_send($json) {
    write_line(json_encode($json));
    write_line("end");
}


function storm_ack($id) {
    storm_send(["command"=>"ack", "id"=>"$id"]);
}

function storm_log($msg) {
    $msg = array("command" => "log", "msg" => "$msg");
    storm_send($msg);
}

$config = json_decode(read_msg(), true);
$heartbeatdir = $config['pidDir'];

$pid = getmypid();
fclose(fopen("$heartbeatdir/$pid", "w"));
storm_send(["pid"=>$pid]);
flush();

while(true) {
        $msg = read_msg();
        $tuple = json_decode($msg, true, 512, JSON_BIGINT_AS_STRING);
        if(!empty($tuple["id"])) {
            if(isPrime($tuple["tuple"][0])) {
                storm_emit(array($tuple["tuple"][0]));
            }
            storm_ack($tuple["id"]);
        }
}
?>
Tip
It is important to put all these scripts in a special folder called multilang/resources in our project directory. This folder gets included in the jar file that is sent to the workers. If we don’t put the scripts in that folder, Storm won’t be able to run them and will report an error.