Apache Flume: Distributed Log Collection for Hadoop, Second Edition (2015)

Chapter 5. Sources and Channel Selectors

Now that we have covered channels and sinks, we will now cover some of the more common ways to get data into your Flume agents. As discussed in Chapter 1Overview and Architecture, the source is the input point for the Flume agent. There are many sources available with the Flume distribution as well as many open source options available. Like most open source software, if you can't find what you need, you can always write your own by extending the org.apache.flume.source.AbstractSource class. Since the primary focus of this book is ingesting files of logs into Hadoop, we'll cover a few of the more appropriate sources to accomplish this.

The problem with using tail

If you have used any of the Flume 0.9 releases, you'll notice that the TailSource is no longer a part of Flume. TailSource provided a mechanism to "tail" (http://en.wikipedia.org/wiki/Tail_(Unix)) any file on the system and create Flume events for each line of the file. It could also handle file rotations, so many used the filesystem as a handoff point between the application creating the data (for instance, log4j) and the mechanism responsible for moving those files someplace else (for instance, syslog).

As is the case with both channels and sinks, events are added and removed from a channel as part of a transaction. When you are tailing a file, there is no way to participate properly in a transaction. If failure to write successfully to a channel occurred, or if the channel was simply full (a more likely event than failure), the data couldn't be "put back" as rollback semantics dictate.

Furthermore, if the rate of data written to a file exceeds the rate Flume could read the data, it is possible to lose one or more log files of input outright. For example, say you were tailing /var/log/app.log. When that file reaches a certain size, it is rotated or renamed, to /var/log/app.log.1, and a new file called /var/log/app.log is created. Let's say you had a favorable review in the press and your application logs are much higher than usual. Flume may still be reading from the rotated file (/var/log/app.log.1) when another rotation occurs, moving /var/log/app.log to /var/log/app.log.1. The file Flume is reading is now renamed to /var/log/app.log.2. When Flume finishes with this file, it will move to what it thinks is the next file (/var/log/app.log), thus skipping the file that now resides at/var/log/app.log.1. This kind of data loss would go completely unnoticed and is something we want to avoid if possible.

The problem with using tail

For these reasons, it was decided to remove the tail functionality from Flume when it was refactored. There are some workarounds for TailSource after it's removed, but it should be noted that no workaround can eliminate the possibility of data loss under load that occurs under these conditions.

The Exec source

The Exec source provides a mechanism to run a command outside Flume and then turn the output into Flume events. To use the Exec source, set the type property to exec:

agent.sources.s1.type=exec

All sources in Flume are required to specify the list of channels to write events to using the channels (plural) property. This is a space-separated list of one or more channel names:

agent.sources.s1.channels=c1

The only other required parameter is the command property, which tells Flume what command to pass to the operating system. Here is an example of the use of this property:

agent.sources=s1

agent.sources.s1.channels=c1

agent.sources.s1.type=exec

agent.sources.s1.command=tail -F /var/log/app.log

Here, I have configured a single source s1 for an agent named agent. The source, an Exec source, will tail the /var/log/app.log file and follow any rotations that outside applications may perform on that log file. All events are written to the c1 channel. This is an example of one of the workarounds for the lack of TailSource in Flume 1.x. This is not my preferred workaround to use tail, but just a simple example of the exec source type. I will show my preferred method in Chapter 7Pulling It All Together.

Note

Should you use the tail -F command in conjunction with the Exec source, it is probable that the forked process will not shut down 100 percent of the time when the Flume agent shuts down or restarts. This will leave orphaned tail processes that will never exit. The tail –F command, by definition, has no end. Even if you delete the file being tailed (at least in Linux), the running tail process will keep the file handle open indefinitely. This keeps the file's space from actually being reclaimed until the tail process exits, which won't happen. I think you are beginning to see why Flume developers don't like tailing files.

If you go this route, be sure to periodically scan the process tables for tail -F whose parent PID is 1. These are effectively dead processes and need to be killed manually.

Here is a list of other properties you can use with the Exec source:

Key

Required

Type

Default

type

Yes

String

exec

channels

Yes

String

space separated list of channels

command

Yes

String

 

shell

No

String

shell command

restart

No

boolean

false

restartThrottle

No

long (milliseconds)

10000 (milliseconds)

logStdErr

No

boolean

false

batchSize

No

int

20

batchTimeout

No

long

3000 (milliseconds)

Not every command keeps running, either because it fails (for example, when the channel it is writing to is full) or because it is designed to exit immediately. In this example, we want to record the system load via the Linux uptime command, which prints out some system information to stdout and exits:

agent.sources.s1.command=uptime

This command will immediately exit, so you can use the restart and restartThrottle properties to run it periodically:

agent.sources.s1.command=uptime

agent.sources.s1.restart=true

agent.sources.s1.restartThrottle=60000

This will produce one event per minute. In the tail example, should the channel fill causing the Exec source to fail, you can use these properties to restart the Exec source. In this case, setting the restart property will start the tailing of the file from the beginning of the current file, thus producing duplicates. Depending on how long the restartThrottle property is, you may have missed some data due to a file rotation outside Flume. Furthermore, the channel may still be unable to accept data, in which case the source will fail again. Setting this value too low means giving less time to the channel to drain, and unlike some of the sinks we saw, there is not an option for exponential backoff.

If you need to use shell-specific features such as wildcard expansion, you can set the shell property as in this example:

agent.sources.s1.command=grep –i apache lib/*.jar | wc -l

agent.sources.s1.shell=/bin/bash -c

agent.sources.s1.restart=true

agent.sources.s1.restartThrottle=60000

This example will find the number of times the case-insensitive apache string is found in all the JAR files in the lib directory. Once per minute, that count will be sent as a Flume event payload.

While the command output written to stdout becomes the Flume event body, errors are sometimes written to stderr. If you want these lines included in the Flume agent's system logs, set the logStdErr property to true. Otherwise, they will be silently ignored, which is the default behavior.

Finally, you can specify the number of events to write per transaction by changing the batchSize property. You may need to set this value higher than the default of 20 if your input data is large and you realize that you cannot write to your channel fast enough. Using a higher batch size reduces the overall average transaction overhead per event. Testing with different values and monitoring the channel's put rate is the only way to know this for sure. The related batchTimeout property sets the maximum time to wait when records fewer than the batch size's number of records have been seen before, flushing a partial batch to the channel. The default setting for this is 3 seconds (specified in milliseconds).

Spooling Directory Source

In an effort to avoid all the assumptions inherent in tailing a file, a new source was devised to keep track of which files have been converted into Flume events and which still need to be processed. The SpoolingDirectorySource is given a directory to watch for newfiles appearing. It is assumed that files copied to this directory are complete. Otherwise, the source might try and send a partial file. It also assumes that filenames never change. Otherwise, on restarting, the source would forget which files have been sent and which have not. The filename condition can be met in log4j using DailyRollingFileAppender rather than RollingFileAppender. However, the currently open file would need to be written to one directory and copied to the spool directory after being closed. None of the log4j appenders shipping have this capability.

That said, if you are using the Linux logrotate program in your environment, this might be of interest. You can move completed files to a separate directory using a postrotate script. The final flow might look something like this:

Spooling Directory Source

To create a Spooling Directory Source, set the type property to spooldir. You must specify the directory to watch by setting the spoolDir property:

agent.sources=s1

agent.sources.channels=c1

agent.sources.s1.type=spooldir

agent.sources.s1.spoolDir=/path/to/files

Here is a summary of the properties for the Spooling Directory Source:

Key

Required

Type

Default

type

Yes

String

spooldir

channels

Yes

String

space separated list of channels

spoolDir

Yes

String

path to directory to spool

fileSuffix

No

String

.COMPLETED

deletePolicy

No

String (never or immediate)

never

fileHeader

No

boolean

false

fileHeaderKey

No

String

file

basenameHeader

No

boolean

false

basenameHeaderKey

No

String

basename

ignorePattern

No

String

^$

trackerDir

No

String

${spoolDir}/.flumespool

consumeOrder

No

String (oldest, youngest, or random)

oldest

batchSize

No

int

10

bufferMaxLines

No

int

100

maxBufferLineLength

No

int

5000

maxBackoff

No

int

4000 (milliseconds)

When a file has been completely transmitted, it will be renamed with a .COMPLETED extension, unless overridden by setting the fileSuffix property, like this:

agent.sources.s1.fileSuffix=.DONE

Starting with Flume 1.4, a new property, deletePolicy, was created to remove completed files from the filesystem rather than just marking them as done. In a production environment, this is critical because your spool disk will fill up over time. Currently, you can only set this for immediate deletion or to leave the files forever. If you want delayed deletion, you'll need to implement your own periodic (cron) job, perhaps using the find command to find files in the spool directory with the COMPLETED file suffix and a modification time longer than some regular value, for example:

find /path/to/spool/dir -type f -name "*.COMPLETED" –mtime 7 –exec rm {} \;

This will find all files completed more than seven days ago and delete them.

If you want the absolute file path attached to each event, set the fileHeader property to true. This will create a header with the file key unless set to something else using the fileHeaderKey property, like this would add the {sourceFile=/path/to/files/foo.1234.log}header if the event was read from the /path/to/files/foo.1234.log file:

agent.sources.s1.fileHeader=true

agent.sources.s1.fileHeaderKey=sourceFile

The related property, basenameHeader, if set to true, will add a header with the basename key, which contains just the filename. The basenameHeaderKey property allows you to change the key's value, as shown here:

agent.sources.s1.basenameHeader=true

agent.sources.s1.basenameHeaderKey=justTheName

This configuration would add the {justTheName=foo.1234.log} header if the event was read from the same file located at /path/to/files/foo.1234.log.

If there are certain file patterns that you do not want this source to read as input, you can pass a regular expression using the ignorePattern property. Personally, I won't copy any files I don't want transferred to Flume in the spoolDir in the first place. If this situation cannot be avoided, use the ignorePattern property to pass a regular expression to match filenames that should not be transferred as data. Furthermore, subdirectories and files that start with a "." (period) character are ignored, so you can avoid costly regular expression processing using this convention instead.

While Flume is sending data, it keeps track of how far it has gotten in each file by keeping a metadata file in the directory specified by the trackerDir property. By default, this file will be .flumespool under spoolDir. Should you want a location other than inside spoolDir, you can specify an absolute file path.

Files in the directory are processed in the "oldest first" manner as calculated by looking at the modification times of the files. You can change this behavior by setting the consumeOrder property. If you set this property to youngest, the newest files will be processed first. This may be desired if data is time sensitive. If you'd rather give equal precedence to all files, you can set this property to a value of random.

The batchSize property allows you to tune the number of events per transaction for writes to the channel. Increasing this may provide better throughput at the cost of larger transactions (and possibly larger rollbacks). The bufferMaxLines property is used to set the size of the memory buffer used in reading files by multiplying it with maxBufferLineLength. If your data is very short, you might consider increasing bufferMaxLines while reducing the maxBufferLineLength property. In this case, it will result in better throughput without increasing your memory overhead. That said, if you have events longer than 5000 characters, you'll want to set maxBufferLineLength higher.

If there is a problem writing data to the channel, a ChannelException is thrown back to the source, where it'll retry after an initial wait time of 250 ms. Each failed attempt will double this time up to a maximum of 4 seconds. To set this maximum time higher, set themaxBackoff property. For instance, if I wanted a maximum of 5 minutes, I can set it in milliseconds like this:

agent.sources.s1.maxBackoff=300000

Finally, you'll want to ensure that whatever mechanism is writing new files to your spooling directory creates unique filenames, such as adding a timestamp (and possibly more). Reusing a filename will confuse the source, and your data may not be processed.

As always, remember that restarts and errors will create duplicates due to retransmission of files partially sent, but not marked as complete, or because the metadata is incomplete.

Syslog sources

Syslog has been around for decades and is often used as an operating-system-level mechanism to capture and move logs around systems. In many ways, there are overlaps with some of the functionality Flume provides. There is even a Hadoop module for rsyslog, one of the more modern variants of syslog (http://www.rsyslog.com/doc/rsyslog_conf_modules.html/omhdfs.html). Generally, I don't like solutions that couple technologies that may version independently. If you use this rsyslog/Hadoop integration, you would be required to update the version of Hadoop you compiled into rsyslog at the same time you upgraded your Hadoop cluster to a new major version. This may be logistically difficult if you have a large number of servers and/or environments. Backward compatibility in Hadoop wire protocols is something that is being actively worked on in the Hadoop community, but currently, it isn't the norm. We'll talk more about this in Chapter 8Monitoring Flume, when we discuss tiering data flows.

Syslog has an older UDP transport as well as a newer TCP protocol that can handle data larger than a single UDP packet can transmit (about 64 KB) and deal with network-related congestion events that might require the data to be retransmitted.

Finally, there are some undocumented properties of syslog sources that allow us to add more regular-expression pattern-matching for messages that do not conform to RFC standards. I won't be discussing these additional settings, but you should be aware of them if you run into frequent parsing errors. In this case, take a look at the source for org.apache.flume.source.SyslogUtils for implementation details to find the cause.

More details on syslog terms (such as a facility) and standard formats can be found in RFC 3164 at http://tools.ietf.org/html/rfc3164.

The syslog UDP source

The UDP version of syslog is usually safe to use when you are receiving data from the server's local syslog process, provided the data is small enough (less than about 64 KB).

Note

The implementation for this source has chosen 2,500 bytes as the maximum payload size regardless of what your network can actually handle. So if your payload will be larger than this, use one of the TCP sources instead.

To create a Syslog UDP source, set the type property to syslogudp. You must set the port to listen on using the port property. The optional host property specifies the bind address. If no host is specified, all IPs for the server will be used, which is the same as specifying 0.0.0.0. In this example, we will only listen for local UDP connections on port 5140:

agent.sources=s1

agent.sources.channels=c1

agent.sources.s1.type=syslogudp

agent.sources.s1.host=localhost

agent.sources.s1.port=5140

If you want syslog to forward a tailed file, you can add a line like this to your syslog configuration file:

*.err;*.alert;*.crit;*.emerg;kern.*   @localhost:5140

This will send all error priority, critical priority, emergency priority, and kernel messages of any priority into your Flume source. The single @ symbol designates that UDP protocol should be used.

Here is a summary of the properties of the Syslog UDP source:

Key

Required

Type

Default

type

Yes

String

syslogudp

channels

Yes

String

space separated list of channels

port

Yes

int

 

host

No

String

0.0.0.0

keepFields

No

boolean

false

The keepFields property tells the source to include the syslog fields as part of the body. By default, these are simply removed, as they become Flume header values.

The Flume headers created by the Syslog UDP source are summarized here:

Header Key

Description

Facility

This is the syslog facility. See the syslog documentation.

Priority

This is the syslog priority. See the syslog documentation.

timestamp

This is the time of the syslog event, translated into an epoch timestamp. It's omitted if it's not parsed from one of the standard RFC formats.

hostname

This is the parsed hostname in the syslog message. It is omitted if it is not parsed.

flume.syslog.status

There was a problem parsing the syslog message's headers. This value is set to Invalid if the payload didn't conform to the RFCs, and set to Incomplete if the message was longer than the eventSize value (for UDP, this is set internally to 2,500 bytes). It's omitted if everything is fine.

The syslog TCP source

As previously mentioned, the Syslog TCP source provides an endpoint for messages over TCP, allowing for a larger payload size and TCP retry semantics that should be used for any reliable inter-server communications.

To create a Syslog TCP source, set the type property to syslogtcp. You must still set the bind address and port to listen on:

agent.sources=s1

agent.sources.s1.type=syslogtcp

agent.sources.s1.host=0.0.0.0

agent.sources.s1.port=12345

If your syslog implementation supports syslog over TCP, the configuration is usually the same, except that a double @ symbol is used to indicate TCP transport. Here is the same example using TCP, where I am forwarding the values to a Flume agent that is running on a different server named flume-1.

*.err;*.alert;*.crit;*.emerg;kern.*   @@flume-1:12345

There are some optional properties for the Syslog TCP source, as listed here:

Key

Required

Type

Default

type

Yes

String

syslogtcp

channels

Yes

String

space separated list of channels

port

Yes

int

 

host

No

String

0.0.0.0

keepFields

No

boolean

false

eventSize

No

int (bytes)

2500 bytes

The keepFields property tells the source to include the syslog fields as part of the body. By default, these are simply removed, as they become Flume header values.

The Flume headers created by the Syslog TCP source are summarized here:

Header Key

Description

Facility

This is the syslog facility. See the syslog documentation.

Priority

This is the syslog priority. See the syslog documentation.

timestamp

This is the time of the syslog event translated into an epoch timestamp. It's omitted if not parsed from one of the standard RFC formats.

hostname

The parsed hostname in the syslog message. It's omitted if not parsed.

flume.syslog.status

There was a problem parsing the syslog message's headers. It's set to Invalid if the payload didn't conform to the RFCs and set to Incomplete if the message was longer than the configured eventSize. It's omitted if everything is fine.

The multiport syslog TCP source

The Multiport Syslog TCP source is nearly identical in functionality to the Syslog TCP source, except that it can listen to multiple ports for input. You may need to use this capability if you are unable to change which port syslog will use in its forwarding rules (it may not be your server at all). It is more likely that you will use this to read multiple formats using one source to write to different channels. We'll cover that in a moment in the Channel Selectors section. Under the hood, a high-performance asynchronous TCP library called Mina (https://mina.apache.org/) is used, which often provides better throughput on multicore servers even when consuming only a single TCP port.

To configure this source, set the type property to multiport_syslogtcp:

agent.sources.s1.type=multiport_syslogtcp

Like the other syslog sources, you need to specify the port, but in this case it is a space-separated list of ports. You can use this only if you have one port specified. The property for this is ports (plural):

agent.sources.s1.type=multiport_syslogtcp

agent.sources.s1.channels=c1

agent.sources.s1.ports=33333 44444

agent.sources.s1.host=0.0.0.0

This code configures the Multiport Syslog TCP source named s1 to listen to any incoming connections on ports 33333 and 44444 and send them to channel c1.

In order to tell which event came from which port, you can set the optional portHeader property to the name of the key whose value will be the port number. Let's add this property to the configuration:

agent.sources.s1.portHeader=port

Then, any events received from port 33333 would have a header key/value of {"port"="33333"}. As you saw in Chapter 4Sinks and Sink Processors, you can now use this value (or any header) as a part of your HDFSSink file path convention, like this:

agent.sinks.k1.hdfs.path=/logs/%{hostname}/%{port}/%Y/%m/%D/%H

Here is a complete table of the properties:

Key

Required

Type

Default

type

Yes

String

syslogtcp

channels

Yes

String

Space-separated list of channels

ports

Yes

int

Space-separated list of port numbers

host

No

String

0.0.0.0

keepFields

No

boolean

false

eventSize

No

int

2500 (bytes)

portHeader

No

String

 

batchSize

No

int

100

readBufferSize

No

int (bytes)

1024

numProcessors

No

int

automatically detected

charset.default

No

String

UTF-8

charset.port.PORT#

No

String

 

This TCP source has some additional tunable options over the standard TCP syslog source. The first is the batchSize property. This is the number of events processed per transaction with the channel. There is also the readBufferSize property. It specifies the internal buffer size used by an internal Mina library. Finally, the numProcessors property is used to size the worker thread pool in Mina. Before you tune these parameters, you may want to familiarize yourself with Mina (http://mina.apache.org/), and look at the source code before deviating from the defaults.

Finally, you can specify the default and per-port character encoding to use when converting between Strings and bytes:

agent.sources.s1.charset.default=UTF-16

agent.sources.s1.charset.port.33333=UTF-8

This sample configuration shows that all ports will be interpreted using UTF-16 encoding, except for port 33333 traffic, which will use UTF-8.

As you've already seen in the other syslog sources, the keepFields property tells the source to include the syslog fields as part of the body. By default, these are simply removed, as they become Flume header values.

The Flume headers created by this source are summarized here:

Header Key

Description

Facility

This is the syslog facility. See the syslog documentation.

Priority

This is the syslog priority. See the syslog documentation.

timestamp

This is the time of the syslog event translated into an epoch timestamp. Omitted if not parsed from one of the standard RFC formats.

hostname

This is the parsed hostname in the syslog message. Omitted if not parsed.

flume.syslog.status

There was a problem parsing the syslog message's headers. This is set to Invalid if the payload didn't conform to the RFCs, set to Incomplete if the message was longer than the configured eventSize, and omitted if everything is fine.

JMS source

Sometimes, data can originate from asynchronous message queues. For these cases, you can use Flume's JMS source to create events read from a JMS Queue or Topic. While it is theoretically possible to use any Java Message Service (JMS) implementation, Flume has only been tested with ActiveMQ, so be sure to test thoroughly if you use a different provider.

Like the previously covered ElasticSearch sink in Chapter 4Sinks and Sink Processors, Flume does not come packaged with the JMS implementation you'll be using, as the versions need to match up, so you'll need to include the necessary implementation JAR files on the Flume agent's classpath. The preferred method is to use the --plugins-dir parameter mentioned in Chapter 2A Quick Start Guide to Flume, which we'll cover in more detail in the next chapter.

Note

ActiveMQ is just one provider of the Java Message Service API. For more information, see the project homepage at http://activemq.apache.org.

To configure this source, set the type property to jms:

agent.sources.s1.type=jms

The first three properties are used to establish a connection with the JMS Server. They are initialContextFactory, connectionFactory, and providerURL. The initialContextFactory property for ActiveMQ will be org.apache.activemq.jndi. ActiveMQInitialContextFactory. TheconnectionFactory property will be the registered JNDI name, which defaults to ConnectionFactory if unspecified. Finally, providerURL is the connection String passed to the connection factory to actually establish a network connection. It is usually a URL-like String consisting of the server name and port information. If you aren't familiar with your JMS configuration, ask somebody who does what values to use in your environment.

This table summarizes these settings and others we'll discuss in a moment:

Key

Required

Type

Default

type

Yes

String

jms

channels

Yes

String

space separated list of channels

initialContextFactory

Yes

String

 

connectionFactory

No

String

ConnectionFactory

providerURL

Yes

String

 

userName

No

String

username for authentication

passwordFile

No

String

path to file containing password for authentication

destinationName

Yes

String

 

destinationType

Yes

String

 

messageSelector

No

String

 

errorThreshold

No

int

10

pollTimeout

No

long

1000 (milliseconds)

batchSize

No

int

100

If your JMS Server requires authentication, pass the userName and passwordFile properties:

agent.sources.s1.userName=jms_bot_user

agent.sources.s1.passwordFile=/path/to/password.txt

Putting the password in a file you reference, rather than directly in the Flume configuration, allows you to keep the permissions on the configuration open for inspection, while storing the more sensitive password data in a separate file that is accessible only to the Flume agent (restrictions are commonly provided by the operating system's permissions system).

The destinationName property decides which message to read from our connection. Since JMS supports both queues and topics, you need to set the destinationType property to queue or topic, respectively. Here's what the properties might look like for a queue:

agent.source.s1.destinationName=my_cool_data

agent.source.s1.destinationType=queue

For a topic, it will look as follows:

agent.source.s1.destinationName=restart_events

agent.source.s1.destinationType=topic

The difference between a queue and a topic is basically the number of entities that will be read from the named destination. If you are reading from a queue, the message will be removed from the queue once it is written to Flume as an event. A topic, once read, is still available for other entities to read.

Should you need only a subset of messages published to a topic or queue, the optional messageSelector String property provides this capability. For example, to create Flume events on messages with a field called Age that is larger than 10, I can specify amessageSelector filter:

agent.source.s1.messageSelector="Age > 10"

Note

Describing the selector capabilities and syntax in detail is far beyond the scope of this book. See http://docs.oracle.com/javaee/1.4/api/javax/jms/Message.html or pick up a book on JMS to become more familiar with JMS message selectors.

Should there be a problem in communicating with the JMS server, the connection will reset itself after a number of failures specified by the errorThreshold property. The default value of 10 is reasonable and will most likely not need to be changed.

Next, the JMS source has a property used to adjust the value of batchSize, which defaults to 100. By now, you should be fairly familiar with adjusting batch sizes with other sources and sinks. For high-volume flows, you'll want to set this higher to consume data in larger chunks from your JMS server to get higher throughput. Setting this too high for low volume flows could delay processing. As always, testing is the only sure way to adjust this properly. The related pollTimeout property specifies how long to wait for new messages to appear before attempting to read a batch. The default, specified in milliseconds, is 1 second. If no messages are read before the poll timeout, the Source will go into an exponential backoff mode before attempting another read. This could delay the processing of messages until it wakes up again. Chances are that you won't need to change this value from the default.

When the message gets converted into a Flume event, the message properties become Flume headers. The message payload becomes the Flume body, but since JMS uses serialized Java objects, we need to tell the JMS source how to interpret the payload. We do this by setting the converter.type property, which defaults to the only implementation that is packaged with Flume using the DEFAULT String (implemented by the DefaultJMSMessageConverer class). It can deserialize JMS BytesMessages, TextMessages, and ObjectMessages (Java objects that implement the java.io.DataOutput interface). It does not handle StreamMessages or MapMessages, so if you need to process them, you'll need to implement your own converter type. To do this, you'll implement theorg.apache.flume.source.jms.JMSMessageConverter interface, and use its fully qualified class name as the converter.type property value.

For completeness, here are these properties in tabular form:

Key

Required

Type

Default

converter.type

No

String

DEFAULT

converter.charset

No

String

UTF-8

Channel selectors

As we discussed in Chapter 1Overview and Architecture, a source can write to one or more channels. This is why the property is plural (channels instead of channel). There are two ways multiple channels can be handled. The event can be written to all the channels or to just one channel, based on some Flume header value. The internal mechanism for this in Flume is called a channel selector.

The selector for any channel can be specified using the selector.type property. All selector-specific properties begin with the usual Source prefix: the agent name, keyword sources, and source name:

agent.sources.s1.selector.type=replicating

Replicating

If you do not specify a selector for a source, replicating is the default. The replicating selector writes the same event to all channels in the source's channels list:

agent.sources.s1.channels=c1 c2 c3

agent.sources.s1.selector.type=replicating

In this example, every event will be written to all three channels: c1, c2, and c3.

There is an optional property on this selector, called optional. It is a space-separated list of channels that are optional. Consider this modified example:

agent.sources.s1.channels=c1 c2 c3

agent.sources.s1.selector.type=replicating

agent.sources.s1.selector.optional=c2 c3

Now, any failure to write to channels c2 or c3 will not cause the transaction to fail, and any data written to c1 will be committed. In the earlier example with no optional channels, any single channel failure would roll back the transaction for all channels.

Multiplexing

If you want to send different events to different channels, you should use a multiplexing channel selector by setting the value of selector.type to multiplexing. You also need to tell the channel selector which header to use by setting the selector.header property:

agent.sources.s1.selector.type=multiplexing

agent.sources.s1.selector.header=port

Let's assume we used the Multiport Syslog TCP source to listen on four ports—11111, 22222, 33333, and 44444—with a portHeader setting of port:

agent.sources.s1.selector.default=c2

agent.sources.s1.selector.mapping.11111=c1 c2

agent.sources.s1.selector.mapping.44444=c2

agent.sources.s1.selector.optional.44444=c3

This configuration will result in the traffic of port 22222 and port 33333 going to the c2 channel only. The traffic of port 11111 will go to the c1 and c2 channels. A failure on either channel would result in nothing being added to either channel. The traffic of port 44444 will go to channels c2 and c3. However, a failure to write to c3 will still commit the transaction to c2, and c3 will not be attempted again with that event.

Summary

In this chapter, we covered in depth the various sources that we can use to insert log data into Flume, including the Exec source, the Spooling Directory Source, Syslog sources (UDP, TCP, and multiport TCP), and the JMS source.

We discussed replicating the old TailSource functionality in Flume 0.9 and problems with using tail semantics in general.

We also covered channel selectors and sending events to one or more channels, specifically the replicating and multiplexing channel selectors.

Optional channels were also discussed as a way to only fail a put transaction for only some of the channels when more than one channel is used.

In the next chapter, we'll introduce interceptors that will allow in-flight inspection and transformation of events. Used in conjunction with channel selectors, interceptors provide the final piece to create complex data flows with Flume. Additionally, we will cover RPC mechanisms (source/sink pairs) between Flume agents using both Avro and Thrift, which can be used to create complex data flows.