Apache Flume: Distributed Log Collection for Hadoop, Second Edition (2015)

Chapter 9. There Is No Spoon - the Realities of Real-time Distributed Data Collection

In this last chapter, I thought we should cover some of the less concrete, random thoughts I have around data collection into Hadoop. There's no hard science behind some of this, and you should feel perfectly alright to disagree with me.

While Hadoop is a great tool to consume vast quantities of data, I often think of a picture of the logjam that occurred in 1886 in the St. Croix River in Minnesota (http://www.nps.gov/sacn/historyculture/stories.htm). When dealing with too much data, you want to make sure you don't jam your river. Be sure to take the previous chapter on monitoring seriously and not just as nice-to-have information.

Transport time versus log time

I had a situation where data was being placed using date patterns in the filename and/or the path in HDFS didn't match the contents of the directories. The expectation was that the data in the 2014/12/29 directory path contained all the data for December 29, 2014. However, the reality was that the date was being pulled from the transport. It turns out that the version of syslog we were using was rewriting the header, including the date portion, causing the data to take on the transport time and not reflect the original time of the record. Usually, the offsets were tiny, just a second or two, so nobody really took notice. However, one day, one of the relay servers died and when the data that had got stuck on upstream servers was finally sent, it had the current time. In this case, it was shifted by a couple of days, causing a significant data cleanup effort.

Be sure this isn't happening to you if you are placing data by date. Check the date edge cases to see that they are what you expect, and make sure you test your outage scenarios before they happen for real in production.

As I mentioned previously, these retransmits due to planned or unplanned maintenance (or even a tiny network hiccup) will most likely cause duplicate and out-of-order events to arrive, so be sure to account for this when processing raw data. There are no single delivery or ordering guarantees in Flume. If you need that, use a transactional database or distributed transaction log such as Apache Kafka (http://kafka.apache.org/) instead. Of course, if you are going to use Kafka, you would probably only use Flume for the final leg of your data path, with your source consuming events from Kafka (https://github.com/baniuyao/flume-ng-kafka-source).


Remember that you can always work around duplicates in your data at query time as long as you can uniquely identify your events from one another. If you cannot distinguish events easily, you can add a Universally Unique Identifier (UUID) (http://en.wikipedia.org/wiki/Universally_unique_identifier) header using the bundled interceptor, UUIDInterceptor (configuration details are in the Flume User Guide).

Time zones are evil

In case you missed my bias against using local time in Chapter 4Sinks and Sink Processors, I'll repeat it here a little stronger: time zones are evil—evil like Dr. Evil (http://en.wikipedia.org/wiki/Dr._Evil)—and let's not forget about his Mini Me counterpart, (http://en.wikipedia.org/wiki/Mini-Me)—Daylight Savings Time.

We live in a global world now. You are pulling data from all over the place into your Hadoop cluster. You may even have multiple data centers in different parts of the country (or the world). The last thing you want to be doing while trying to analyze your data is to deal with askew data. Daylight Savings Time changes at least somewhere on Earth a dozen times in a year. Just look at the history: ftp://ftp.iana.org/tz/releases/. Save yourself the headache and just normalize it to UTC. If you want to convert it to "local time" on its way to human eyeballs, feel free. However, while it lives in your cluster, keep it normalized to UTC.


Consider adopting UTC everywhere via this Java startup parameter (if you can't set it system-wide): -Duser.timezone=UTC

Also, use the ISO 8601 (http://en.wikipedia.org/wiki/ISO_8601) time standard where possible and be sure to include time zone information (even if it is UTC). Every modern tool on the planet supports this format and will save you pain down the road.

I live in Chicago, and our computers at work use Central Time, which adjusts for daylight savings. In our Hadoop cluster, we like to keep data in a YYYY/MM/DD/HH directory layout. Twice a year, some things break slightly. In the fall, we have twice as much data in our 2 a.m. directory. In the spring, there is no 2 a.m. directory. Madness!

Capacity planning

Regardless of how much data you think you have, things will change over time. New projects will pop up and data creation rates for your existing projects will change (up or down). Data volume will usually ebb and flow with the traffic of the day. Finally, the number of servers feeding your Hadoop cluster will change over time.

There are many schools of thought on how much extra storage capacity you should keep in your Hadoop cluster (we use the totally unscientific value of 20 percent, which means that we usually plan for 80 percent full when ordering additional hardware but don't start to panic until we hit the 85-90 percent utilization number). Generally, you want to keep enough extra space so that the failure and/or maintenance of a server or two won't cause the HDFS block replication to consume all the remaining space.

You may also need to set up multiple flows inside a single agent. The source and sink processors are currently single-threaded, so there is some limit to what tuning batch sizes can accomplish when under heavy data volumes. Be very careful in these situations where you split your data flow at the source using a replicating channel selector to multiple channels/sinks. If one of the path's channels fills up, an exception is thrown back to the source. If that full channel is not marked as optional and the data is dropped, the source will stop consuming new data. This effectively jams the agent for all other channels attached to that source. You may not want to drop the data (marking the channel as optional) because the data is important. Unfortunately, this is the only fan-out mechanism provided in Flume to send to multiple destinations, so make sure you catch issues quickly so that all your data flows are not impaired due to a cascade backup of events.

For a number of Flume agents feeding Hadoop, this too should be adjusted based on real numbers. Watch the channel size to see how well the writes are keeping up under normal loads. Adjust the maximum channel capacity to handle whatever amount of overhead makes you feel good. You can always purchase way more hardware than you need, but even a prolonged outage may overflow even the most conservative estimates. This is when you have to pick and choose which data is more important to you and adjust your channel capacities to reflect that. This way, if you exceed your limits, the least important data will be the first to be dropped.

Chances are your company doesn't have an infinite amount of money and at some point, the value of the data versus the cost of continuing to expand your cluster will start to be questioned. This is why setting limits on the volume of data collected is very important. This is just one aspect of your data retention policy, where cost is the driving factor. In a moment, we'll discuss some of the compliance aspects of this policy. Suffice to say, any project sending data into Hadoop should be able to say what the value of that data is and what the loss is if we delete the older stuff. This is the only way the people writing the checks can make an informed decision.

Considerations for multiple data centers

If you run your business out of multiple data centers and have a large volume of data collected, you may want to consider setting up a Hadoop cluster in each data center rather than sending all your collected data back to a single data center. There may be regulatory implications regarding data crossing certain geographic boundaries. Chances are there is somebody in your company who knows much more about compliance than you or I, so seek them out before you start copying data across borders. Of course, not collating your data will make it more difficult to analyze it, as you can't just run one MapReduce job against all the data. Instead, you would have to run parallel jobs and then combine the results in a second pass. Adjusting your data processing procedures is better than potentially breaking the law. Be sure to do your homework.

Pulling all your data into a single cluster may also be more than your networking can handle. Depending on how your data centers are connected to each other, you simply may not be able to transmit the desired volume of data. If you use public cloud services, there are surely data transfer costs between data centers. Finally, consider that a complete cluster failure or corruption may wipe out everything, as most clusters are usually too big to back up everything except high value data. Having some of the old data in this case is sometimes better than having nothing. With multiple Hadoop clusters, you have the ability to use a FailoverSinkProcessor to forward data to a different cluster if you don't want to wait to send to the local one.

If you do choose to send all your data to a single destination, consider adding a large disk capacity machine as a relay server for the data center. This way, if there is a communication issue or extended cluster maintenance, you can let data pile up on a machine that's different from the ones trying to service your customers. This is sound advice even in a single data center situation.

Compliance and data expiry

Remember that the data your company is collecting from your customers should be considered sensitive information. You may be bound by additional regulatory limitations on accessing data such as:

·        Personally identifiable information (PII): How you handle and safeguard customer's identities http://en.wikipedia.org/wiki/Personally_identifiable_information

·        Payment Card Industry Data Security Standard (PCI DSS): How you safeguard credit card information http://en.wikipedia.org/wiki/PCI_DSS

·        Service Organization Control (SOC-2): How you control access to information/systems http://www.aicpa.org/InterestAreas/FRC/AssuranceAdvisoryServices/Pages/AICPASOC2Report.aspx

·        Statements on Standards for Attestation Engagements (SSAE-16): How you manage changes http://www.aicpa.org/Research/Standards/AuditAttest/DownloadableDocuments/AT-00801.pdf

·        Sarbanes Oxley (SOX): http://en.wikipedia.org/wiki/Sarbanes%E2%80%93Oxley_Act

This is by no means a definitive list, so be sure to seek out your company's compliance experts for what does and doesn't apply to your situation. If you aren't properly handling access to this data in your cluster, the government will lean on you, or worse, you won't have customers anymore if they feel you aren't protecting their personal information. Consider scrambling, trimming, or obfuscating your data of personal information. Chances are the business insight you are looking falls more into the category of "how many people who search for "hammer" actually buy one?" rather than "how many customers are named Bob?" As you saw in Chapter 6Interceptors, ETL, and Routing, it would be very easy to write an interceptor to obfuscate PII as you move it around.

Your company probably has a document retention policy that includes the data you are putting into Hadoop. Make sure you remove data that your policy says you aren't supposed to be keeping around anymore. The last thing you want is a visit from the lawyers.


In this chapter, we covered several real-world considerations you need to think about when planning your Flume implementation, including:

·        Transport time does not always match event time

·        The mayhem introduced with Daylight Savings Time to certain time-based logic

·        Capacity planning considerations

·        Items to consider when you have more than one data center

·        Data compliance

·        Data retention and expiration

I hope you enjoyed this book. Hopefully, you will be able to apply much of this information directly in your application/Hadoop integration efforts.

Thanks, this was fun!