Real-time Analytics with Storm and Cassandra (2015)

Chapter 3. Understanding Storm Internals by Examples

This chapter of the book is dedicated to making you understand the internals of Storm and how it works using practical examples. The intent is to get you accustomed to writing you own spouts, go through reliable and non-reliable topologies, and acquaint you with various groupings provided by the Storm.

The topics that will be covered in the chapter are as follows:

·        Storm spouts and custom spouts

·        Anchoring and acking

·        Different stream groupings

By the end of this chapter, you should be able to understand the various groupings and the concept of reliability by using of anchoring, and you will be able to create your own spouts.

Customizing Storm spouts

You have explored and understood WordCount topology provided by the Storm-starter project in previous chapters. Now it's time we move on to the next step, the do it yourself journey with Storm; so let's take up the next leap and do some exciting stuff with our own spouts that read from various sources.

Creating FileSpout

Here we will create our own spout to read the events or tuples from a file source and emit them into the topology; we would substitute spout in place of RandomSentenceSpout we used in the WordCount topology in the previous chapter.

To start, copy the project we created in Chapter 2Getting Started with Your First Topology, into a new project and make the following changes in RandomSentenceSpout to make a new class called FileSpout within the Storm-starter project.

Now we will make changes in FileSpout so that it reads sentences from a file as shown in the following code:

public class FileSpout extends BaseRichSpout {

  //declaration section

  SpoutOutputCollector _collector;

  DataInputStream in ;

  BufferedReader br;

  Queue qe;

  //constructor

    public FileSpout() {

        qe = new LinkedList();

    }

  // the messageId builder method

  private String getMsgId(int i) {

    return (new StringBuilder("#@#MsgId")).append(i).toString();

    }

  //The function that is called at every line being read by  readFile

  //method and adds messageId at the end of each line and then add

  // the line to the linked list

    private void queueIt() {

      int msgId = 0;

      String strLine;

      try {

          while ((strLine = br.readLine()) != null) {

              qe.add((new  StringBuilder(String.valueOf(strLine))).append("#@#"  + getMsgId(msgId)).toString());

              msgId++;

          }

      } catch (IOException e) {

          e.printStackTrace();

      } catch (Exception e) {

          e.printStackTrace();

      }

    }

  //function to read line from file at specified location

  private void readFile() {

        try {

          FileInputStream fstream = new  FileInputStream("/home/mylog"); in =  new DataInputStream(fstream);

          br = new BufferedReader(new InputStreamReader( in ));

          queueIt();

          System.out.println("FileSpout file reading done");

        } catch (FileNotFoundException e) {

            e.printStackTrace();

        }

    }

  //open function that is called at the time of spout  initialization

  // it calls the readFile method that reads the file , adds  events

  // to the linked list to be fed to the spout as tuples

  @

    Override

    public void open(Map conf, TopologyContext context,  SpoutOutputCollector  collector) {

      _collector = collector;

      readFile();

    }

  //this method is called every 100 ms and it polls the list

  //for message which is read off as next tuple and emit the spout  to

  //the topology. When queue doesn't have any events, it reads the

  //file again calling the readFile method

    @

    Override

    public void nextTuple() {

      Utils.sleep(100);

      String fullMsg = (String) qe.poll();

      String msg[] = (String[]) null;

      if (fullMsg != null) {

          msg = (new String(fullMsg)).split("#@#");

          _collector.emit(new Values(msg[0]));

          System.out.println((new StringBuilder("nextTuple done  ")).append(msg[1]).toString());

      } else {

          readFile();

      }

    }

  @

  Override

  public void ack(Object id) {}

  @

  Override

  public void fail(Object id) {}

  @

  Override

  public void declareOutputFields(OutputFieldsDeclarer declarer) {

      declarer.declare(new Fields("word"));

  }

}

Tip

Downloading the example code

You can download the example code files for all the Packt books you have purchased from your account at http://www.packtpub.com. If you purchased this book elsewhere, you can visit http://www.packtpub.com/support and register to have the files e-mailed directly to you.

Tweaking WordCount topology to use FileSpout

Now we need to fit FileSpout into our WordCount topology and execute it. To do this, you need to change one line of code in WordCount topology and instantiate FileSpout instead of RandomSentenceSpout in TopologyBuilder, as shown here:

public static void main(String[] args) throws Exception {

  TopologyBuilder builder = new TopologyBuilder();

//builder.setSpout("spout", new RandomSentenceSpout(), 5);

  builder.setSpout("spout", new FileSpout(), 1);

This one line change will take care of instantiation of the new spout that will read from the specified file /home/mylog (please create this file before you execute the program). Here is a screenshot of the output for your reference:

Tweaking WordCount topology to use FileSpout

The SocketSpout class

As a next step to understand the spouts better, let's create a SocketSpout class. Assuming that you are proficient in writing Socket Server or Producer, I will walk you through the process of creating a custom SocketSpout class to consume a socket output in the Storm topology:

public class SocketSpout extends BaseRichSpout{

  static SpoutOutputCollector collector;

  //The socket

    static Socket myclientSocket;

    static ServerSocket myserverSocket;

    static int myport;

  public SocketSpout(int port){

    myport=port;

  }

  public void open(Map conf,TopologyContext context,  SpoutOutputCollector collector){

    _collector=collector;

    myserverSocket=new ServerSocket(myport);

  }

  public void nextTuple(){

    myclientSocket=myserverSocket.accept();

    InputStream incomingIS=myclientSocket.getInputStream();

    byte[] b=new byte[8196];

    int len=b.incomingIS.read(b);

    _collector.emit(new Values(b));

  }

}

Anchoring and acking

We have talked about DAG that is created for the execution of a Storm topology. Now when you are designing your topologies to cater to reliability, there are two items that needs to be added to Storm:

·        Whenever a new link, that is, a new stream is being added to the DAG, it is called anchoring

·        When the tuple is processed in entirety, it is called acking

When Storm knows these preceding facts, then during the processing of tuples it can gauge them and accordingly fail or acknowledge the tuples depending upon whether they are completely processed or not.

Let's take a look at the following WordCount topology bolts to understand the Storm API anchoring and acking better:

·        SplitSentenceBolt: The purpose of this bolt was to split the sentence into different words and emit it. Now let's examine the output declarer and the execute methods of this bolt in detail (specially the highlighted sections) as shown in the following code:

·          public void execute(Tuple tuple) {

·              String sentence = tuple.getString(0);

·              for(String word: sentence.split(" ")) {

·                  _collector.emit(tuple, new Values(word)); //1

·              }

·              _collector.ack(tuple); //2

·          }

·          public void declareOutputFields(OutputFieldsDeclarer  declarer) {

·              declarer.declare(new Fields("word")); //3

·          }

}

The output declarer functionality of the preceding code is elaborated as follows:

·        _collector.emit: Here each tuple being emitted by the bolt on the stream called word (the second argument ) is anchored using the first argument of the method (the tuple). In this arrangement, if a failure occurs the tuple being anchored at the root of the tree would be replayed by the spout.

·        collector.ack: Here we are informing Storm that tuple has been processed successfully by this bolt. In the event of a failure, the programmer can explicitly call a fail method, or Storm internally calls it, as in the case of timeout events so that it can be replayed.

·        declarer.declare: This is the method called to specify the stream on which successfully processed tuples would be emitted. Notice that we have used the same word stream in the _collector.emit method. Similarly, if you look into the WordCount topology's Builder method, you'd find another piece in overall integration of word stream, which is as follows:

  builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

The unreliable topology

Now let's look at the unreliable version of the same topology. Here, if the tuple fails to be processed by Storm in entirety, it is not replayed by the framework. The code which we used previously, in this topology, would look like this:

java _collector.emit(new Values(word));

Thus, an un-anchored tuple is emitted by the bolt. Sometimes, due to programming needs to handle various problems, developers deliberately create unreliable topologies.

Stream groupings

Next we need to get acquainted with various stream groupings (a stream grouping is basically the mechanism that defines how Storm partitions and distributes the streams of tuples amongst tasks of bolts) provided by Storm. Streams are the basic wiring component of a Storm topology, and understanding them provides a lot of flexibility to the developer to handle various problems in programs efficiently.

Local or shuffle grouping

Local or shuffle grouping is the most common grouping that randomly distributes the tuples emitted by the source ensuring equal distribution, that is, each instance of the bolt gets to process the same number of events. Load balancing is automatically taken care of by this grouping.

Due to the random nature of distribution of this grouping, it's useful only for atomic operations by specifying a single parameter—source of stream. The following snippet is from WordCount topology (which we reated earlier), which demonstrates the usage of shuffle grouping:

TopologyBuilder myBuilder = new TopologyBuilder();

builder.setSpout("spout", new RandomSentenceSpout(), 5);

builder.setBolt("split", new SplitSentence(),  8).shuffleGrouping("spout");

builder.setBolt("count", new WordCount(),  12).fieldsGrouping("split", new Fields("word"));

In the following figure, shuffle grouping is depicted:

Local or shuffle grouping

Here Bolt A and Bolt B both have a parallelism of two, each; so two instances of each of these bolts is spawned by the Storm framework. These bolts are wired together by shuffle grouping. We will now discuss the distribution of events.

The 50 percent events from Instance 1 of Bolt A would go to Instance 1 of Bolt B, and the remaining 50 percent would go to Instance 2 of Bolt B. Similarly, 50 percent of events emitted by Instance 2 of Bolt B would go to Instance 1 of Bolt B, and the remaining 50 percent would go to Instance 2 of Bolt B.

Fields grouping

In this grouping, we specify two parameters—the source of the stream and the fields. The values of the fields are actually used to control the routing of the tuples to various bolts. This grouping guarantees that for the same field's value, the tuple will always be routed to the same instance of the bolt.

In the following figure, field grouping is depicted between Bolt A and Bolt B, and each of these bolts have two instances each. Notice the flow of events based on the value of the field grouping parameter.

Fields grouping

All the events from Instance 1 and Instance 2 of Bolt A, where the value of Field is P are sent to Instance 1 of Bolt B.

All the events from Instance 1 and Instance 2 of Bolt A, where the value of Field is Q are sent to Instance 2 of Bolt B.

All grouping

All grouping is a kind of broadcaster grouping that can be used in situations where the same message needs to be sent to all instances of the destination bolt. Here, each tuple is sent to all the instances of the bolt.

This grouping should be used in very specific cases, for specific streams, where we want the same information to be replicated to all bolt instances downstream. Let's take a use case that has some information related to a country and its currency value and the bolts following the bolt, which does need this information for some currency conversion. Now whenever currency bolt has any changes, it uses all grouping to publish it to all the instances of the following bolts:

All grouping

Here we have a diagrammatic representation of all grouping, where all the tuples from Bolt A are sent to all the instances of Bolt B.

Global grouping

Global grouping makes sure that the entire stream from the source component (spout or bolt) goes to a single instance of target bolt, to be more precise and specific to the instance of the target bolt with the lowest ID. Well let's understand the concept with an example, let's say my topology is as follows:

Global grouping

I will assign the following parallelism to the components:

Global grouping

Also, I will use the following stream groupings:

Global grouping

Then, the framework will direct all data from the myboltA stream instances, that are emitting onto one instance of myboltB stream, which would be the one to which Storm has assigned a lower ID while instantiation:

Global grouping

As in the preceding figure, in the case of global grouping, all tuples from both instances of Bolt A would go to Instance 1 of Bolt B, assuming it has a lower ID than Instance 2 of Bolt B.

Note

Storm basically assigns IDs to each instance of a bolt or spout that it creates in the topology. In global grouping, the allocations are directed to the instance that has a lower value on the ID allocated from Storm.

Custom grouping

Storm, being an extendible framework, provides the facility to developers to create their own stream grouping. This can be done by providing an implementation to the backtype.storm.grouping.CustomStreamGroupinginterface class.

Direct grouping

In this kind of grouping, the Storm framework provides the ability to the sender

component (spout or bolt) to decide which task of the consumer bolt would receive the tuple while the sender component is emitting a tuple to the stream.

The tuple must be emitted to the stream using a special emitDirect method to the stream, and the task of consuming a component has to be specified (note that the tasked can be fetched using the TopologyContext method).

Quiz time

Q.1 State whether the following statements are true or false:

1.    All components of reliable topologies use anchoring.

2.    In the event of a failure, all the tuples are played back again.

3.    Shuffle grouping does load balancing.

4.    Global grouping is like a broadcaster.

Q.2 Fill in the blanks:

1.    _______________ is the method to tell the framework that the tuple has been successfully processed.

2.    The _______________ method specifies the name of the stream.

3.    The ___________ method is used to push the tuple downstream in the DAG.

Make changes to WordCount topology of the Storm-starter project to create a custom grouping so that all words starting from a particular letter always go to same instance of the WordCount bolt.

Summary

In this chapter, we have understood the intricacies of the Storm spout. We also created a custom file spout and integrated it with WordCount topology. We also introduced you to the concepts of reliability, acking, and anchoring. The knowledge of various groupings provided by the current version of Storm further enhance the capabilities of a user to explore and experiment.

In the next chapter, we shall get you acquainted with the clustered setup of Storm as well as give you an insight on various monitoring tools of clustered mode.