Apache Flume: Distributed Log Collection for Hadoop, Second Edition (2015)

Chapter 3. Channels

In Flume, a channel is the construct used between sources and sinks. It provides a buffer for your in-flight events after they are read from sources until they can be written to sinks in your data processing pipelines.

The primary types we'll cover here are a memory-backed/nondurable channel and a local-filesystem-backed/durable channel. Starting with Flume 1.5, an experimental hybrid memory and file channel called the Spillable Memory Channel is introduced. The durable file channel flushes all changes to disk before acknowledging the receipt of the event to the sender. This is considerably slower than using the nondurable memory channel, but it provides recoverability in the event of system or Flume agent restarts. Conversely, the memory channel is much faster, but failure results in data loss and it has much lower storage capacity when compared to the multiterabyte disks backing the file channel. This is why the Spillable Memory Channel was created. In theory, you get the benefits of memory speed until the memory fills up due to flow backpressure. At this point, the disk will be used to store the events—and with that comes much larger capacity. There are trade-offs here as well, as performance is now variable depending on how the entire flow is performing. Ultimately, the channel you choose depends on your specific use cases, failure scenarios, and risk tolerance.

That said, regardless of what channel you choose, if your rate of ingest from the sources into the channel is greater than the rate at which the sink can write data, you will exceed the capacity of the channel and throw a ChannelException. What your source does or doesn't do with that ChannelException is source-specific, but in some cases, data loss is possible, so you'll want to avoid filling channels by sizing things properly. In fact, you always want your sink to be able to write faster than your source input. Otherwise, you might get into a situation where once your sink falls behind, you can never catch up. If your data volume tracks with the site usage, you can have higher volumes during the day and lower volumes at night, giving your channels time to drain. In practice, you'll want to try and keep the channel depth (the number of events currently in the channel) as low as possible because time spent in the channel translates to a time delay before reaching the final destination.

The memory channel

A memory channel, as expected, is a channel where in-flight events are stored in memory. As memory is (usually) orders of magnitude faster than the disk, events can be ingested much more quickly, resulting in reduced hardware needs. The downside of using this channel is that an agent failure (hardware problem, power outage, JVM crash, Flume restart, and so on) results in the loss of data. Depending on your use case, this might be perfectly fine. System metrics usually fall into this category, as a few lost data points isn't the end of the world. However, if your events represent purchases on your website, then a memory channel would be a poor choice.

To use the memory channel, set the type parameter on your named channel to memory.

agent.channels.c1.type=memory

This defines a memory channel named c1 for the agent named agent.

Here is a table of configuration parameters you can adjust from the default values:

Key

Required

Type

Default

type

Yes

String

memory

capacity

No

int

100

transactionCapacity

No

int

100

byteCapacityBufferPercentage

No

int (percent)

20%

byteCapacity

No

long (bytes)

80% of JVM Heap

keep-alive

No

int

3 (seconds)

The default capacity of this channel is 100 events. This can be adjusted by setting the capacity property as follows:

agent.channels.c1.capacity=200

Remember that if you increase this value, you will most likely have to increase your Java heap space using the -Xmx, and optionally -Xms, parameters.

Another capacity-related setting you can set is transactionCapacity. This is the maximum number of events that can be written, also called a put, by a source's ChannelProcessor, the component responsible for moving data from the source to the channel, in a single transaction. This is also the number of events that can be read, also called a take, in a single transaction by the SinkProcessor, which is the component responsible for moving data from the channel to the sink. You might want to set this higher in order to decrease the overhead of the transaction wrapper, which might speed things up. The downside to increasing this is that a source would have to roll back more data in the event of a failure.

Tip

Flume only provides transactional guarantees for each channel in each individual agent. In a multiagent, multichannel configuration, duplicates and out-of-order delivery are likely but should not be considered the norm. If you are getting duplicates in nonfailure conditions, it means that you need to continue tuning your Flume configurations.

If you are using a sink that writes some place that benefits from larger batches of work (such as HDFS), you might want to set this higher. Like many things, the only way to be sure is to run performance tests with different values. This blog post from Flume committer Mike Percy should give you some good starting points: http://bit.ly/flumePerfPt1.

The byteCapacityBufferPercentage and byteCapacity parameters were introduced in https://issues.apache.org/jira/browse/FLUME-1535 as a means to size the memory channel capacity using the number of bytes used rather than the number of events as well as trying to avoid OutOfMemoryErrors. If your events have a large variance in size, you might be tempted to use these settings to adjust the capacity, but be warned that calculations are estimated from the event's body only. If you have any headers, which you will, your actual memory usage will be higher than the configured values.

Finally, the keep-alive parameter is the time the thread writing data into the channel will wait when the channel is full, before giving up. As data is being drained from the channel at the same time, if space opens up before the timeout expires, the data will be written to the channel rather than throwing an exception back to the source. You might be tempted to set this value very high, but remember that waiting for a write to a channel will block the data flowing into your source, which might cause data to back up in an upstream agent. Eventually, this might result in events being dropped. You need to size for periodic spikes in traffic as well as temporary planned (and unplanned) maintenance.

The file channel

A file channel is a channel that stores events to the local filesystem of the agent. Though it's slower than the memory channel, it provides a durable storage path that can survive most issues and should be used in use cases where a gap in your data flow is undesirable.

This durability is provided by a combination of a Write Ahead Log (WAL) and one or more file storage directories. The WAL is used to track all input and output from the channel in an atomically safe way. This way, if the agent is restarted, the WAL can be replayed to make sure all the events that came into the channel (puts) have been written out (takes) before the stored data can be purged from the local filesystem.

Additionally, the file channel supports the encryption of data written to the filesystem if your data handling policy requires that all data on the disk (even temporarily) be encrypted. I won't cover this here, but should you need it, there is an example in the Flume User Guide (http://flume.apache.org/FlumeUserGuide.html). Keep in mind that using encryption will reduce the throughput of your file channel.

To use the file channel, set the type parameter on your named channel to file.

agent.channels.c1.type=file

This defines a file channel named c1 for the agent named agent.

Here is a table of configuration parameters you can adjust from the default values:

Key

Required

Type

Default

type

Yes

String

file

checkpointDir

No

String

~/.flume/file-channel/checkpoint

useDualCheckpoints

No

boolean

false

backupCheckpointDir

No

String

No default, but must be different from checkpointDir

dataDirs

No

String (comma separated list)

~/.flume/file-channel/data

capacity

No

int

1000000

keep-alive

No

int

3 (seconds)

transactionCapacity

No

int

10000

checkpointInterval

No

long

30000 (milliseconds)

maxFileSize

No

long

2146435071 (bytes)

minimumRequiredSpace

No

long

524288000 (bytes)

To specify the location where the Flume agent should hold data, set the checkpointDir and dataDirs properties:

agent.channels.c1.checkpointDir=/flume/c1/checkpoint

agent.channels.c1.dataDirs=/flume/c1/data

Technically, these properties are not required and have sensible default values for development. However, if you have more than one file channel configured in your agent, only the first channel will start. For production deployments and development work with multiple file channels, you should use distinct directory paths for each file channel storage area and consider placing different channels on different disks to avoid IO contention. Additionally, if you are sizing a large machine, consider using some form of RAID that contains striping (RAID 10, 50, or 60) to achieve higher disk performance rather than buying more expensive 10K or 15K drives or SSDs. If you don't have RAID striping but have multiple disks, set dataDirs to a comma-separated list of each storage location. Using multiple disks will spread the disk traffic almost as well as striped RAID but without the computational overhead associated with RAID 50/60 as well as the 50 percent space waste associated with RAID 10. You'll want to test your system to see whether the RAID overhead is worth the speed difference. As hard drive failures are a reality, you might prefer certain RAID configurations to single disks in order to protect yourself from the data loss associated with single drive failures. RAID 6 across the maximum number of disks can provide the highest performance for the minimal amount of data protection when combined with redundant and reliable power sources (such as an uninterruptable power supply or UPS).

Using the JDBC channel is a bad idea as it would introduce a bottleneck and single point of failure in what should be designed as a highly distributed system. NFS storage should be avoided for the same reason.

Tip

Be sure to set HADOOP_PREFIX and JAVA_HOME environment variables when using the file channel. While we seemingly haven't used anything Hadoop-specific (such as writing to HDFS), the file channel uses Hadoop Writables as an on-disk serialization format. If Flume can't find the Hadoop libraries, you might see this in your startup, so check your environment variables:

java.lang.NoClassDefFoundError: org/apache/hadoop/io/Writable

Starting with Flume 1.4, the file channel supports a secondary checkpoint directory. In situations where a failure occurs while writing the checkpoint data, that information could become unusable and a full replay of the logs is necessary in order to recover the state of the channel. As only one checkpoint is updated at a time before flipping to the other, one should always be in a consistent state, thus shortening restart times. Without valid checkpoint information, the Flume agent can't know what has been sent and what has not been sent in dataDirs. As files in the data directories might contain large amounts of data already sent but not yet deleted, a lack of checkpoint information would result in a large number of records being resent as duplicates.

Incoming data from sources is written and acknowledged at the end of the most current file in the data directory. The files in the checkpoint directory keep track of this data when it's taken by a sink.

If you want to use this feature, set the useDualCheckpoints property to true and specify a location for that second checkpoint directory with the backupCheckpointDir property. For performance reasons, it is always preferred that this be on a different disk from the other directories used by the file channel:

agent.channels.c1.useDualCheckpoints=true

agent.channels.c1.backupCheckpointDir=/flume/c1/checkpoint2

The default file channel capacity is one million events regardless of the size of the event contents. If the channel capacity is reached, a source will no longer be able to ingest the data. This default should be fine for low volume cases. You'll want to size this higher if your ingestion is so heavy that you can't tolerate normal planned or unplanned outages. For instance, there are many configuration changes you can make in Hadoop that require a cluster restart. If you have Flume writing important data into Hadoop, the file channel should be sized to tolerate the time it takes to restart Hadoop (and maybe add a comfort buffer for the unexpected). If your cluster or other systems are unreliable, you can set this higher still to handle even larger amounts of downtime. At some point, you'll run into the fact that your disk space is a finite resource, so you will have to pick some upper limit (or buy bigger disks).

The keep-alive parameter is similar to memory channels. It is the maximum time the source will wait when trying to write into a full channel before giving up. If space becomes available before the timeout, the write is successful; otherwise, ChannelException is thrown back to the source.

The transactionCapacity property is the maximum number of events allowed in a single transaction. This might become important for certain sources that batch together events and pass them to the channel in a single call. Most likely, you won't need to change this from the default. Setting this higher allocates additional resources internally, so you shouldn't increase it unless you run into performance issues.

The checkpointInterval property is the number of milliseconds between performing a checkpoint (which also rolls the log files written to logDirs). If you do not set this, 30 seconds will be used.

Checkpoint files also roll based on the volume of data written to them using the maxFileSize property. You can lower this value for low traffic channels if you want to try and save some disk space. Let's say your maximum file size is 50,000 bytes but your channel only writes 500 bytes a day; it would take 100 days to fill a single log. Let's say that you were on day 100 and 2000 bytes came in all at once. Some data would be written to the old file and a new file would be started with the overflow. After the roll, Flume tries to remove any log files that aren't needed anymore. As the full log has unprocessed records, it cannot be removed yet. The next chance to clean up that old log file might not come for another 100 days. It probably doesn't matter if that old 50,000 byte file sticks around longer, but as the default is around 2 GB, you could have twice that (4 GB) disk space used per channel. Depending on how much disk you have available and the number of channels configured in your agent, this might or might not be a problem. If your machines have plenty of storage space, the default should be fine.

Finally, the minimumRequiredSpace property is the amount of space you do not want to use for writing logs. The default configuration will throw an exception if you attempt to use the last 500 MB of the disk associated with the dataDir path. This limit applies across all channels, so if you have three file channels configured, the upper limit is still 500 MB and not 1.5 GB. You can set this value as low as 1 MB, but generally speaking, bad things tend to happen when you push disk utilization towards 100 percent.

Spillable Memory Channel

Introduced in Flume 1.5, the Spillable Memory Channel is a channel that acts like a memory channel until it is full. At that point, it acts like a file channel that is configured with a much larger capacity than its memory counterpart but runs at the speed of your disks (which means orders of magnitude slower).

Note

The Spillable Memory Channel is still considered experimental. Use it at your own risk!

I have mixed feelings about this new channel type. On the surface, it seems like a good idea, but in practice, I can see problems. Specifically, having a variable channel speed that changes depending on how downstream entities in your data pipe behave makes for difficult capacity planning. As a memory channel is used under good conditions, this implies that the data contained in it can be lost. So why would I go through extra trouble to save some of it to the disk? The data is either very important for me to spool it to disk with a file-backed channel, or it's less important and can be lost, so I can get away with less hardware and use a faster memory-backed channel. If I really need memory speed but with the capacity of a hard drive, Solid State Drive (SSD) prices have come down enough in recent years for a file channel on SSD to now be a viable option for you rather than using this hybrid channel type. I do not use this channel myself for these reasons.

To use this channel configuration, set the type parameter on your named channel to spillablememory:

agent.channels.c1.type=spillablememory

This defines a Spillable Memory Channel named c1 for the agent named agent.

Here is a table of configuration parameters you can adjust from the default values:

Key

Required

Type

Default

type

Yes

String

spillablememory

memoryCapacity

No

int

10000

overflowCapacity

No

int

100000000

overflowTimeout

No

int

3 (seconds)

overflowDeactivationThreshold

No

int

5 (percent)

byteCapacityBufferPercentage

No

int

20 (percent)

byteCapacity

No

long (bytes)

80% of JVM Heap

checkpointDir

No

String

~/.flume/file-channel/checkpoint

dataDirs

No

String (comma separated list)

~/.flume/file-channel/data

useDualCheckpoints

No

boolean

false

backupCheckpointDir

No

String

No default, but must be different than checkpointDir

transactionCapacity

No

int

10000

checkpointInterval

No

long

30000 (milliseconds)

maxFileSize

No

long

2146435071 (bytes)

minimumRequiredSpace

No

long

524288000 (bytes)

As you can see, many of the fields match against the memory channel and file channel's properties, so there should be no surprises here. Let's start with the memory side.

The memoryCapacity property determines the maximum number of events held in the memory (this was just called capacity for the memory channel but was renamed here to avoid ambiguity). Also, the default value, if unspecified, is 10,000 records instead of 100. If you wanted to double the default capacity, the configuration might look something like this:

agent.channels.c1.memoryCapacity=20000

As mentioned previously, you will most likely need to increase the Java heap space allocated using the -Xmx and -Xms parameters. Like most Java programs, more memory usually helps the garbage collector run more efficiently, especially as an application such as Flume generates a lot of short-lived objects. Be sure to do your research and pick an appropriate JVM garbage collector based on your available hardware.

Tip

You can set memoryCapacity to zero, which effectively turns it into a file channel. Don't do this. Just use the file channel and be done with it.

The overflowCapacity property determines the maximum number of events that can be written to the disk before an error is thrown back to the source feeding it. The default value is a generous 100,000,000 events (far larger than the file channel's default of 100,000). Most servers nowadays have multiterabyte disks, so space should not be a problem, but do the math against your average event size to be sure you don't fill your disks by accident. If your channels are filling this much, you are probably dealing with another issue downstream and the last thing you need is your data buffer layer filling up completely. For example, if you had a large 1 megabyte event payload, 100 million of these add up to 100 terabytes, which is probably bigger than the disk space on an average server. A 1 kilobyte payload would only take 100 gigabytes, which is probably fine. Just do the math ahead of time so you are not surprised.

Tip

You can set overflowCapacity to zero, which effectively turns it into a memory channel. Don't do this. Just use the memory channel and be done with it.

The transactionCapacity property adjusts the batch size of events written to a channel in a single transaction. If this is set too low, it will lower the throughput of the agent on high volume flows because of the transaction overhead. For a high volume channel, you will probably need to set this higher, but the only way to be sure is to test your particular workflow. See Chapter 8Monitoring Flume, to learn how to accomplish this.

Finally, the byteCapacityBufferPercentage and byteCapacity parameters are identical in functionality and defaults to the memory channel, so I won't waste your time repeating it here.

What is important is the overflowTimeout property. This is the number of seconds after which the memory part of the channel fills before data starts getting written to the disk-backed portion of the channel. If you want writes to start occurring immediately, you can set this to zero. You might wonder why you need to wait before starting to write to the disk portion of the channel. This is where the undocumented overflowDeactivationThreshold property comes into play. This is the amount of time that space has to be available in the memory path before it can switch back from disk writing. I believe this is an attempt to prevent flapping back and forth between the two. Of course, there really are no ordering guarantees in Flume, so I don't know why you would choose to append to the disk buffer if a spot is available in faster memory. Perhaps they are trying to avoid some kind of starvation condition, although the code appears to attempt to remove events in the order of arrival even when using both memory and disk queues. Perhaps it will be explained to us should it ever come out of experimental status.

Note

The overflowDeactivationThreshold property is stated to be for internal use only, so adjust it at your own peril. If you are considering it, be sure to get familiar with the source code so that you understand the implications of altering the default.

The rest of the properties on this channel are identical in name and functionality to its file channel counterpart, so please refer to the previous section.

Summary

In this chapter, we covered the two channel types you are most likely to use in your data processing pipelines.

The memory channel offers speed at the cost of data loss in the event of failure. Alternatively, the file channel provides a more reliable transport in that it can tolerate agent failures and restarts at a performance cost.

You will need to decide which channel is appropriate for your use cases. When trying to decide whether a memory channel is appropriate, ask yourself what the monetary cost is if you lose some data. Weigh that against the additional costs of more hardware to cover the difference in performance when deciding if you need a durable channel after all. Another consideration is whether or not the data can be resent. Not all data you might ingest into Hadoop will come from streaming application logs. If you receive "daily downloads" of data, you can get away with using a memory channel because if you encounter a problem, you can always rerun the import.

Finally, we covered the experimental Spillable Memory Channel. Personally, I think its creation is a bad idea, but like most things in computer science, everybody has an opinion on what is good or bad. I feel that the added complexity and nondeterministic performance make for difficult capacity planning, as you should always size things for the worst-case scenario. If your data is critical enough for you to overflow to the disk rather than discard the events, then you aren't going to be okay with losing even the small amount held in memory.

In the next chapter, we'll look at sinks, specifically, the HDFS sink to write events to HDFS, the Elastic Search sink to write events to Elastic Search, and the Morphline Solr sink to write events to Solr. We will also cover Event Serializers, which specify how Flume events are translated into output that's more suitable for the sink. Finally, we will cover sink processors and how to set up load balancing and failure paths in a tiered configuration for more robust data transport.