Real-time Analytics with Storm and Cassandra (2015)

Chapter 9. Storm Management and Maintenance

In this chapter, you will understand scaling of the Storm cluster. You will also see how to adapt the Storm topology worker and parallelism.

We will cover the following topics:

·        Adding new supervisor nodes

·        Setting up workers and parallelism to enhance processing

·        Troubleshooting

Scaling the Storm cluster – adding new supervisor nodes

In production, one of the most common scenarios one can run into is when the processing need outgrows the size of the cluster. Scaling then becomes necessary; there are two options: we can perform vertical scaling wherein we can add more compute capability, or we can use horizontal scaling where we add more nodes. The latter is more cost-effective and also makes the cluster more robust.

Here are the steps to be executed to add a new node to the Storm cluster:

1.    Download and install the 0.9.2 version of Storm as it is used in the rest of the cluster by extracting the downloaded ZIP file.

2.    Create the required directories:

3.  sudo mkdir –p /usr/local/storm/tmp

3.    All Storm nodes, the Nimbus nodes, and the supervisor require a location on to store a small amount of data related to configurations on the local disk. Please ensure you create the directory and assign read/write permissions on all Storm nodes.

4.    Create the required directories for the logs, as follows:

5.  sudo mkdir –p /mnt/app_logs/storm/storm_logs

5.    Update the storm.yaml file with necessary changes for Nimbus and Zookeeper:

6.  #storm.zookeeper.servers: This is a list of the hosts in the  Zookeeper cluster for Storm cluster

7.  storm.zookeeper.servers:

8.    - "<IP_ADDRESS_OF_ZOOKEEPER_ENSEMBLE_NODE_1>"

9.    - "<IP_ADDRESS_OF_ZOOKEEPER_ENSEMBLE_NODE_2>"

10.#storm.zookeeper.port: Port on which zookeeper cluster is running.

11.  storm.zookeeper.port: 2182

12.#For our installation, we are going to create this directory in  /usr/local/storm/tmp location.

13.storm.local.dir: "/usr/local/storm/tmp"

14.#nimbus.host: The nodes need to know which machine is the #master  in order to download topology jars and confs. This #property is  used for the same purpose.

15.nimbus.host: "<IP_ADDRESS_OF_NIMBUS_HOST>"

16.#storm.messaging.netty configurations: Storm's Netty-based  #transport has been overhauled to significantly improve  #performance through better utilization of thread, CPU, and  #network resources, particularly in cases where message sizes  #are small. In order to provide netty support, following  #configurations need to be added :

17.storm.messaging.transport:"backtype.storm.messaging.netty.Context"

18.storm.messaging.netty.server_worker_threads:1

19.storm.messaging.netty.client_worker_threads:1

20.storm.messaging.netty.buffer_size:5242880

21.storm.messaging.netty.max_retries:100

22.storm.messaging.netty.max_wait_ms:1000

storm.messaging.netty.min_wait_ms:100

The values of the slots of the supervisor ports are as follows:

supervisor.slots.ports

- 6700

- 6701

- 6702

- 6703

6.    Set the STORM_HOME environment in the ~/.bashrc file and add Storm's bin directory in the PATH environment variable. This is added to execute Storm binaries from any location. The entry to be added is as follows:

7.  STORM_HOME=/usr/local/storm

8.  PATH=$PATH:$STORM_HOME/bin

7.    Update /etc/hosts on each of the following machines and the node:

o   The nimbus machine: This is done to add an entry for the new supervisor that's being added

o   All existing supervisor machines: This is done to add an entry for the new supervisor that's being added

o   The new supervisor node: This is done to add the nimbus entry, to add the entry for all other supervisors, and to add an entry for the Zookeeper node

Here is a sample snippet for the IP 10.46.205.248 and the sup-flm-1.mydomain.com host:

10.192.206.160    sup-flm-2. mydomain.net

10.4.27.405       nim-zkp-flm-3. mydomain.net

Once the supervisor has been added, start the process and it should be visible on the UI, as shown in the following screenshot:

Scaling the Storm cluster – adding new supervisor nodes

Note that the first row in the preceding screenshot points to the newly added supervisor; it has 16 slots in total and 0 slots are being used as it has been just added to the cluster.

Scaling the Storm cluster and rebalancing the topology

Once a new supervisor is added, the next obvious step would be to rebalance the topologies, which are executed on the cluster so that the load could be shared across to the newly added supervisor.

Rebalancing using the GUI

Rebalance option is available on the Nimbus UI where you can choose the topology that is to be rebalanced, and then use the option from the GUI. The topology drains as per the specified time-out. During that duration, it stops accepting any messages from the spout and the ones in the internal queues are processed and once completely clear, the workers and tasks are redistributed. The user also has option to increase or decrease the parallelism for various bolts and spouts using the rebalance options. The following screenshot describes how to rebalance a topology using the Storm UI options:

Rebalancing using the GUI

Rebalancing using the CLI

The second option for rebalancing is using the Storm CLI. The command for this is as follows:

storm rebalance mystormtopology -n 5 -e my-spout=3 -e my-bolt=10

Here, –n specifies the number of workers allocated to the topology post-rebalance, -e my-spout refers to parallelism assigned to the spout, and similarly –e my-bolt refers to parallelism to be assigned to the bolt. In the preceding command, we executed the Storm shell from the bindirectory under the Storm installation JAR, and while rebalancing the Storm topology by changing the parallelism of the spout and bolts as well.

The changes to the execution of the preceding commands can be verified from the Storm UI.

Setting up workers and parallelism to enhance processing

Storm is a highly scalable, distributed, and fault tolerant real-time parallel processing compute framework. Note that the emphasis is on scalability, distributed, and parallel processing—well, we already know that Storm operates in clustered mode and is therefore distributed in its basic nature. Scalability was covered in the previous section; now, let's have a closer look at parallelism. We introduced you to this concept in an earlier chapter, but now we'll get you acquainted with how to tweak it to achieve the desired performance. The following points are the key criteria for this:

·        A topology is allocated a certain number of workers at the time it's started.

·        Each component in the topology (bolts and spouts) has a specified number of executors associated with it. These executors specify the number or degree of parallelism for each running component of the topology.

·        The whole efficiency and speed factor of Storm are driven by the parallelism feature of Storm, but we need to understand one thing: all the executors that attribute to parallelism are running within the limited set of workers allocated to the topology. So, one needs to understand that increasing the parallelism would help achieve efficiency only to a point, but beyond that the executors will struggle for resource is the intention. Going beyond this increasing parallelism would not fetch efficiency, but increasing the workers allocated to the topology would would make computation efficient.

Another point to understand in terms of efficiency is network latency; we'll explore this in the following sections.

Scenario 1

This following figure illustrates a simple topology with three moving components: one spout and two bolts. Here, all the components are executing on separate nodes in the cluster, thus every tuple has to do two network hops to complete its execution.

Scenario 1

Let's say we are not satisfied with the throughput and decide to increase the parallelism. The moment we try to move into this technique, the question that arises is where to increase it and by how much. That could be computed based on the capacity of the bolt, which should be visible from the Storm UI. The following screenshot illustrates this:

Scenario 1

Here, the circled value is the capacity of the second bolt, which is around 0.9 and it's already in red, which means this bolt is over-worked and increasing parallelism here should help. Any topology would actually break and stop acking when the bolt capacity crosses 1. To fix this, let's see the next scenario, which provides a solution for this issue.

Scenario 2

Here, we have acted on the realization that Bolt B is overloaded and has increased the parallelism, as shown in the following figure:

Scenario 2

The preceding figure describes one scenario capturing the distribution of various instances of the bolts and spouts across different nodes in the cluster. Here, we have acted on the realization that a bolt is overloaded and we observed the capacity, and by brute force, increased the parallelism of only that bolt.

Now, having done this, we have achieved the required parallelism; let's now have a look at the network latency, in terms of how many tuples are moving between nodes (internode communication is a mandatory element in a distributed computing setup):

·        50 percent of the traffic is hopping between spouts on Machine 1 and Machine 2

·        50 percent of the traffic is hopping between Machine 1 and Machine 3

·        100 percent of the traffic is hopping between Machine 2 and Machine 3

Now let's see another illustration with a slight variation in the parallelism.

Scenario 3

The scenario 3 is the most optimal scenario that is possible in the setup in the example where we use network and parallelism very efficiently, as shown in the following figure:

Scenario 3

Now, the preceding figure is an illustration of where we get the maximum benefit of parallelism usage. If you look at the preceding figure, you'll see that we have achieved efficiency and no network hop; the best of both the worlds.

What I am trying to illustrate is that parallelism should be changed judicially keeping the impact of network latency, hops, and the speed of localized processing in mind.

Storm troubleshooting

As developers, we need to accept the reality that things do go wrong and debugging is required. This section is going to equip you to handle such situations effectively and efficiently. The first thing is to understand two root mantras of the programming world:

·        Work as if everything that could break will break

·        Anything that could break can be fixed

Having accepted the reality, let's address the situation first by understanding what could fail and then have a clear understanding of where we should start the analysis to help us handle any situation with the Storm cluster. Let's get to grips with the various pointers that show us the problems and thus guide us to prospective solutions.

The Storm UI

First of all, let's understand which statistics and indicators are present on the UI itself. The latest UI has scores of indicators that give us an insight into what is going on in the cluster and what could go wrong (just in case things break).

Let's look at Storm UI where the Cluster Summary entails, for example, http:// ip of nimbus:8080 in my case is http://10.4.2.122:8080 and my UI process executes on the nimbus machine that has this IP: 10.4.2.122.

The Storm UI

In the preceding screenshot, we can see the following parameters:

·        The version of Storm being used is in the first column.

·        The uptime of Nimbus (second column) tells us how long the Nimbus node has been running since the last restart. Nimbus, as we know, is required only at the time when the topology is submitted or when a supervisor or worker has gone down and the tasks are being delegated again. Nimbus is also required to be up during the rebalancing of the topology.

·        The third column gives us the number of supervisors on the cluster.

·        Columns four, five, and six show the number of used worker slots, number of free worker slots, and total number of worker slots across the Storm supervisors. This is a very important statistic. In any production grade cluster, one should always have a provision for some of the workers going down or one or two supervisors being killed. So, I recommend that you always have enough free slots on your cluster to accommodate such sudden failures.

·        Column seven and column eight specify the moving tasks in the topology, that is, in terms of the number of tasks and executors running in the system.

Let's have a look at the second section on the Storm UI opening page; this one captures the topology summary:

The Storm UI

This section depicts various parameters Storm captures and displays at the topology level:

·        Column one and column two display the Name field of the topology and the Id field of topology, respectively.

·        Column three reads the status of the topology, which is ACTIVE for a topology that's executing and processing.

·        Column four displays the uptime since the topology has been started.

·        The next three columns display NumworkersNum tasks, and Num executors; these are very important aspects for the performance of the topology. While tuning the performance, one has to realize that just increasing the Num tasks and Num executors field value may not result in greater efficiency. If the number of workers is low, and we just increase the number of executors and tasks, then the starvation of resource high because of the limited number of workers, so the topology performance will deteriorate.

Similarly, if we assign too many workers to a topology with not enough executors and tasks to utilize all of them, we'd waste the precious resources by keeping them blocked and idle.

On the other hand, if we have a high number of workers and a high number of executors and tasks, the chances are that performance may degrade due to network latency.

Having stated these facts, I want to emphasize the fact that the performance tuning should be done cautiously and judiciously to arrive at what number works for the use case we are trying to implement.

The following screenshot captures the details about the supervisors, in terms of the statistics, with the corresponding information:

The Storm UI

·        Column one has the Id field for the supervisors, and column two has the names of the hosts field that have supervisor processes running.

·        Column three captures the amount of time the supervisor has been running for.

·        Columns five and six capture the number of slots available on the supervisor and the number of slots used respectively. These two numbers provide a very important metric in terms of how many slots are available and how many are used. They help us judge and understand what capacity the supervisors are operating at and how much bandwidth they have to handle the scenarios of failures; for instance, all my supervisors are operating at 100 percent capacity, so in that case, my cluster can't handle any failures.

The following screenshot is captured from the Storm UI depicting supervisors and their attributes:

The Storm UI

The preceding section gives us details about the supervisor slots, timeouts, and so on. These values are specified on storm.yaml, but can be verified from the UI. For example, http:// ip of nimbus:8080 in my case is http://10.4.2.122:8080, and my UI process executes on the Nimbus machine that has this IP: 10.4.2.122, as shown in the following screenshot:

The Storm UI

Now in the section depicted in the following screenshot one can get into by drilling deeper into the topology details. This can be achieved on the Storm UI by clicking on any of the topology names. This section holds the details about the components of the topology including the level of bolts, spouts, and details about them, as shown in the following screenshot:

The Storm UI

The preceding screenshot has details ranging from the number of executors or tasks allocated to each component, to the number of tuples emitted by the bolts or spouts and the number of tuples transferred to the next component in the Directed Acyclic Graph (DAG).

Other notable details one should observe on the topology detail page are as follows:

·        Capacity of bolts in the last 10 minutes: This should be well below 1.

·        Execute latency is time in milliseconds: This determines how long it would take to execute a tuple through this component. If this value is too high, then we would probably want to break the execution into two or more bolts to utilize parallelism and have better efficiency.

·        Executed: This stores the number of tuples executed successfully by this component.

·        Process latency: This value displays the average total time taken to execute a tuple by the component. This value should be analyzed with the execute latency. These are practical cases that may happen:

o   Execute latency and Process latency are both low (that's the best possible case)

o   Execute latency is low but process latency is very high (that means actual execution time is lower in comparison to the total execution time and increasing parallelism might help achieve efficiency)

o   Both Execute latency and Process latency are high (again, increasing parallelism might help)

Storm logs

The next place to debug if things don't go as expected is the Storm log. First of all, one needs to know the location for Storm logs, which also update the path on cluster.xml at storm-0.9.2-incubating.zip\apache-storm-0.9.2-incubating\logback\cluster.xml:

<appender class="ch.qos.logback.core.rolling.RollingFileAppender"  name="A1">

  <!—update this as below  <file>${storm.home}/logs/${logfile.name}</file> -->

  <file>/mnt/app_logs/storm/storm_logs/${logfile.name}</file>

  <rollingPolicy  class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">

    <fileNamePattern>${storm.home}/logs/${logfile.name}.%i </fileNamePattern>

    <minIndex>1</minIndex>

    <maxIndex>9</maxIndex>

</rollingPolicy>

<triggeringPolicy  class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">

    <maxFileSize>100MB</maxFileSize>

</triggeringPolicy>

  <encoder>

    <pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>

  </encoder>

</appender>

Now the line in bold gets you the path/location where the Storm logs will be created. Let's take a closer look to find out what kinds of logs are created by different Storm daemons.

The Nimbus node logs can be obtained by using the following commands on shell:

Cd /mnt/my_logs/strom/storm_logs

ls-lart

The listing of the Nimbus log directory is shown in the following screenshot:

Storm logs

Notice that we have nimbus.log, which has details about Nimbus' startup, error, and info logs; ui.log is created on the node where we start the Storm UI application.

The logs on the supervisor nodes can be obtained by using the following commands on shell:

Cd /mnt/my_logs/strom/storm_logs

ls-lart

The listing of the supervisor log directory is shown in the following screenshot:

Storm logs

One can see supervisor logs and worker logs. The supervisor logs capture the details about the supervisor starting up, any errors, and so on. The worker logs are the ones where the developer's topology logs appear along with Storm logs for various bolts and spouts.

So if we want to troubleshoot the Storm daemon processes, we would look at nimbus.log and supervisor.log. If you're having issues, then you need to debug using the corresponding worker log. The scenario of nimbus and worker node failures has been covered in Chapter 4Storm in a Clustered Mode.

Now let's imagine a scenario. I am a developer whose topology is not behaving as expected, and I doubt that one of the bolts is not functioning as expected. So we need to debug the worker logs and find the root cause. Now we need to find out which worker log to look at out of multiple supervisors and numerous worker logs; we'll get this information from the Storm UI. Perform the following steps:

1.    Open Storm UI and click on the troublesome topology.

2.    Click on the suspected bolt or spout of the topology. A screen analogous to what is shown in this screenshot should appear:

Storm logs

Here is the clue to debug what's happening in this bolt; I will look into Supervisor5 and Supervisor6, of worker-6705.log on supervisor5 and supervisor6.

Quiz time

Q.1. State whether the following statements are true or false:

1.    Storm nodes can't be added to the cluster with topologies being executed.

2.    A topology can't survive the Storm node failure.

3.    Storm logs are created on each node in the cluster.

4.    The location of the Storm log creation is configurable.

Q.2. Fill in the blanks:

1.    _______________ is the heartbeat tracker of the cluster.

2.    _______________ is the daemon that's mandatory for topology submission and rebalancing.

3.    The ___________ file holds the worker configuration for the topology.

Q.3. Execute the following use cases to see the internals of Storm:

1.    Start nimbus and check nimbus.log to see what a successful startup should look like.

2.    Start the supervisor and check Supervisor.log to see what a successful startup should look like.

3.    Submit the topology, say a simple WordCount topology, and figure out the worker.log file creation.

4.    Update log4j.properties to change the logging level and verify its impact.

Summary

In this chapter, we have covered the maintenance concepts of Storm in terms of adding new nodes, rebalancing, and killing topologies. We have understood and tweaked internals such as numtasks and parallelism in combination with to numworkers and network latency. You learned to locate and decipher logs of Storm components. You also understood the metrics of the Storm UI and their implications on topology performance.

In the next chapter, we will discuss advanced concepts of Storm, including micro-batching and Trident APIs.