Hadoop in Practice, Second Edition (2015)

Part 2. Data logistics

Chapter 5. Moving data into and out of Hadoop

This chapter covers

·        Understanding key design considerations for data ingress and egress tools

·        Low-level methods for moving data into and out of Hadoop

·        Techniques for moving log files and relational and NoSQL data, as well as data in Kafka, in and out of HDFS

Data movement is one of those things that you aren’t likely to think too much about until you’re fully committed to using Hadoop on a project, at which point it becomes this big scary unknown that has to be tackled. How do you get your log data sitting across thousands of hosts into Hadoop? What’s the most efficient way to get your data out of your relational and No/NewSQL systems and into Hadoop? How do you get Lucene indexes generated in Hadoop out to your servers? And how can these processes be automated?

Welcome to chapter 5, where the goal is to answer these questions and set you on your path to worry-free data movement. In this chapter you’ll first see how data across a broad spectrum of locations and formats can be moved into Hadoop, and then you’ll see how data can be moved out of Hadoop.

This chapter starts by highlighting key data-movement properties, so that as you go through the rest of this chapter you can evaluate the fit of the various tools. It goes on to look at low-level and high-level tools that can be used to move your data. We’ll start with some simple techniques, such as using the command line and Java for ingress,[1] but we’ll quickly move on to more advanced techniques like using NFS and DistCp.

1 Ingress and egress refer to data movement into and out of a system, respectively.

Once the low-level tooling is out of the way, we’ll survey higher-level tools that have simplified the process of ferrying data into Hadoop. We’ll look at how you can automate the movement of log files with Flume, and how Sqoop can be used to move relational data. So as not to ignore some of the emerging data systems, you’ll also be introduced to methods that can be employed to move data from HBase and Kafka into Hadoop.

We’ll cover a lot of ground in this chapter, and it’s likely that you’ll have specific types of data you need to work with. If this is the case, feel free to jump directly to the section that provides the details you need.

Let’s start things off with a look at key ingress and egress system considerations.

5.1. Key elements of data movement

Moving large quantities of data in and out of Hadoop offers logistical challenges that include consistency guarantees and resource impacts on data sources and destinations. Before we dive into the techniques, however, we need to discuss the design elements you should be aware of when working with data movement.


An idempotent operation produces the same result no matter how many times it’s executed. In a relational database, the inserts typically aren’t idempotent, because executing them multiple times doesn’t produce the same resulting database state. Alternatively, updates often are idempotent, because they’ll produce the same end result.

Any time data is being written, idempotence should be a consideration, and data ingress and egress in Hadoop are no different. How well do distributed log collection frameworks deal with data retransmissions? How do you ensure idempotent behavior in a MapReduce job where multiple tasks are inserting into a database in parallel? We’ll examine and answer these questions in this chapter.


The data aggregation process combines multiple data elements. In the context of data ingress, this can be useful because moving large quantities of small files into HDFS potentially translates into NameNode memory woes, as well as slow MapReduce execution times. Having the ability to aggregate files or data together mitigates this problem and is a feature to consider.

Data format transformation

The data format transformation process converts one data format into another. Often your source data isn’t in a format that’s ideal for processing in tools such as Map-Reduce. If your source data is in multiline XML or JSON form, for example, you may want to consider a preprocessing step. This would convert the data into a form that can be split, such as one JSON or XML element per line, or convert it into a format such as Avro. Chapter 3 contains more details on these data formats.


Compression not only helps by reducing the footprint of data at rest, but also has I/O advantages when reading and writing data.

Availability and recoverability

Recoverability allows an ingress or egress tool to retry in the event of a failed operation. Because it’s unlikely that any data source, sink, or Hadoop itself can be 100% available, it’s important that an ingress or egress action be retried in the event of failure.

Reliable data transfer and data validation

In the context of data transportation, checking for correctness is how you verify that no data corruption occurred as the data was in transit. When you work with heterogeneous systems such as Hadoop data ingress and egress, the fact that data is being transported across different hosts, networks, and protocols only increases the potential for problems during data transfer. A common method for checking the correctness of raw data, such as storage devices, is Cyclic Redundancy Checks (CRCs), which are what HDFS uses internally to maintain block-level integrity.

In addition, it’s possible that there are problems in the source data itself due to bugs in the software generating the data. Performing these checks at ingress time allows you to do a one-time check, instead of dealing with all the downstream consumers of the data that would have to be updated to handle errors in the data.

Resource consumption and performance

Resource consumption and performance are measures of system resource utilization and system efficiency, respectively. Ingress and egress tools don’t typically impose significant load (resource consumption) on a system, unless you have appreciable data volumes. For performance, the questions to ask include whether the tool performs ingress and egress activities in parallel, and if so, what mechanisms it provides to tune the amount of parallelism. For example, if your data source is a production database and you’re using MapReduce to ingest that data, don’t use a large number of concurrent map tasks to import data.


Monitoring ensures that functions are performing as expected in automated systems. For data ingress and egress, monitoring breaks down into two elements: ensuring that the processes involved in ingress and egress are alive, and validating that source and destination data are being produced as expected. Monitoring should also include verifying that the data volumes being moved are at expected levels; unexpected drops or highs in your data will alert you to potential system issues or bugs in your software.

Speculative execution

MapReduce has a feature called speculative execution that launches duplicate tasks near the end of a job for tasks that are still executing. This helps prevent slow hardware from impacting job execution times. But if you’re using a map task to perform inserts into a relational database, for example, you should be aware that you could have two parallel processes inserting the same data.[2]

2 Map- and reduce-side speculative execution can be disabled via the mapreduce.map.speculative and mapreduce.reduce.speculative configurables in Hadoop 2.

On to the techniques. Let’s start with how you can leverage Hadoop’s built-in ingress mechanisms.

5.2. Moving data into Hadoop

The first step in working with data in Hadoop is to make it available to Hadoop. There are two primary methods that can be used to move data into Hadoop: writing external data at the HDFS level (a data push), or reading external data at the MapReduce level (more like a pull). Reading data in MapReduce has advantages in the ease with which the operation can be parallelized and made fault tolerant. Not all data is accessible from MapReduce, however, such as in the case of log files, which is where other systems need to be relied on for transportation, including HDFS for the final data hop.

In this section we’ll look at methods for moving source data into Hadoop. I’ll use the design considerations in the previous section as the criteria for examining and understanding the different tools.

We’ll get things started with a look at some low-level methods you can use to move data into Hadoop.

5.2.1. Roll your own ingest

Hadoop comes bundled with a number of methods to get your data into HDFS. This section will examine various ways that these built-in tools can be used for your data movement needs. The first and potentially easiest tool you can use is the HDFS command line.


Picking the right ingest tool for the job

The low-level tools in this section work well for one-off file movement activities, or when working with legacy data sources and destinations that are file-based. But moving data in this way is quickly becoming obsolete by the availability of tools such as Flume and Kafka (covered later in this chapter), which offer automated data movement pipelines.

Kafka is a much better platform for getting data from A to B (and B can be a Hadoop cluster) than the old-school “let’s copy files around!” With Kafka, you only need to pump your data into Kafka, and you have the ability to consume the data in real time (such as via Storm) or in offline/batch jobs (such as via Camus).

File-based ingestion flows are, to me at least, a relic of the past (because everybody knows how scp works :-P), and they primarily exist for legacy reasons—the upstream data sources may have existing tools to create file snapshots (such as dump tools for the database), and there’s no infrastructure to migrate or move the data into a real-time messaging system such as Kafka.


Technique 33 Using the CLI to load files

If you have a manual activity that you need to perform, such as moving the examples bundled with this book into HDFS, then the HDFS command-line interface (CLI) is the tool for you. It’ll allow you to perform most of the operations that you’re used to performing on a regular Linux filesystem. In this section we’ll focus on copying data from a local filesystem into HDFS.


You want to copy files into HDFS using the shell.


The HDFS command-line interface can be used for one-off moves, or it can be incorporated into scripts for a series of moves.


Copying a file from local disk to HDFS is done with the hadoop command:

$ hadoop fs -put local-file.txt hdfs-file.txt

The behavior of the Hadoop -put command differs from the Linux cp command—in Linux if the destination already exists, it is overwritten; in Hadoop the copy fails with an error:

put: `hdfs-file.txt': File exists

The -f option must be added to force the file to be overwritten:

$ hadoop fs -put -f local-file.txt hdfs-file.txt

Much like with the Linux cp command, multiple files can be copied using the same command. In this case, the final argument must be the directory in HDFS into which the local files are copied:

$ hadoop fs -put local-file1.txt local-file2.txt /hdfs/dest/

You can also use Linux pipes to pipe the output of a command into an HDFS file—use the same -put command and add a separate hyphen after it, which tells Hadoop to read the input from standard input:

$ echo "the cat sat on the mat" | hadoop fs -put - hdfs-file.txt

To test for the existence of a file or directory, use the -test command with either the -e or -d option to test for file or directory existence, respectively. The exit code of the command is 0 if the file or directory exists, and 1 if it doesn’t:

$ hadoop fs -test -e hdfs-file.txt

$ echo $?


$ hadoop fs -touchz hdfs-file.txt

$ hadoop fs -test -e hdfs-file.txt

$ echo $?


$ hadoop fs -test -d hdfs-file.txt

$ echo $?


If all you want to do is “touch” a file in HDFS (create a new empty file), the touchz option is what you’re looking for:

$ hadoop fs -touchz hdfs-file.txt

There are many more operations supported by the fs command—to see the full list, run the command without any options:

$ hadoop fs

Usage: hadoop fs [generic options]

    [-appendToFile <localsrc> ... <dst>]

    [-cat [-ignoreCrc] <src> ...]

    [-checksum <src> ...]

    [-chgrp [-R] GROUP PATH...]

    [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]

    [-chown [-R] [OWNER][:[GROUP]] PATH...]

    [-copyFromLocal [-f] [-p] <localsrc> ... <dst>]

    [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]

    [-count [-q] <path> ...]

    [-cp [-f] [-p] <src> ... <dst>]

    [-createSnapshot <snapshotDir> [<snapshotName>]]

    [-deleteSnapshot <snapshotDir> <snapshotName>]

    [-df [-h] [<path> ...]]

    [-du [-s] [-h] <path> ...]


    [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]

    [-getmerge [-nl] <src> <localdst>]

    [-help [cmd ...]]

    [-ls [-d] [-h] [-R] [<path> ...]]

    [-mkdir [-p] <path> ...]

    [-moveFromLocal <localsrc> ... <dst>]

    [-moveToLocal <src> <localdst>]

    [-mv <src> ... <dst>]

    [-put [-f] [-p] <localsrc> ... <dst>]

    [-renameSnapshot <snapshotDir> <oldName> <newName>]

    [-rm [-f] [-r|-R] [-skipTrash] <src> ...]

    [-rmdir [--ignore-fail-on-non-empty] <dir> ...]

    [-setrep [-R] [-w] <rep> <path> ...]

    [-stat [format] <path> ...]

    [-tail [-f] <file>]

    [-test -[defsz] <path>]

    [-text [-ignoreCrc] <src> ...]

    [-touchz <path> ...]

    [-usage [cmd ...]]

The CLI is designed for interactive HDFS activities, and it can also be incorporated into scripts for some tasks you wish to automate. The disadvantage of the CLI is that it’s low-level and doesn’t have any automation mechanisms built in, so you’ll need to look elsewhere if that’s your goal. It also requires a fork for each command, which may be fine if you’re using it in a bash script, but it likely isn’t what you want to use if you’re trying to integrate HDFS functionality into a Python or Java application. In that case, the overhead of launching an external process for each command, in addition to the brittle nature of launching and interacting with an external process, is likely something you’ll want to avoid.

The next technique is more suited to working with HDFS in programming languages such as Python.

Technique 34 Using REST to load files

The CLI is handy for quickly running commands and for scripting. However, it incurs the overhead of forking a separate process for each command, which is overhead that you’ll probably want to avoid, especially if you’re interfacing with HDFS in a programming language. This technique covers working with HDFS in languages other than Java (which is covered in a subsequent section).


You want to be able to interact with HDFS from a programming language that doesn’t have a native interface to HDFS.


Use Hadoop’s WebHDFS interface, which offers a full-featured REST API for HDFS operations.


Before you get started, you’ll need to make sure WebHDFS is enabled on your cluster (by default it’s not). This is governed by the dfs.webhdfs.enabled property. If it’s not enabled, you’ll need to update hdfs-site.xml and add the following:





In this technique, we’ll cover running WebHDFS on an unsecured Hadoop cluster.[3] If you’re working on a secure Hadoop cluster, you won’t supply the user.name argument; instead you’ll authenticate with Kerberos using kinit prior to interacting with WebHDFS, and then supply --negotiate -u:youruser in the curl command line.

3 In an unsecured Hadoop cluster (which is the default setup), any user can masquerade as another user in the cluster. This is especially problematic with WebHDFS, which exposes the username directly in the URL, making it trivial to enter some other user’s name. Hadoop security in the form of Kerberos will prevent this from happening because it requires that users be authenticated via LDAP or Active Directory prior to interacting with Hadoop.


Warning: Running WebHDFS on an unsecured cluster

If WebHDFS is enabled for a cluster where security is turned off, then it can easily be used to run commands as arbitrary users in your cluster (simply change the username in the URL to be any user in the cluster). It’s recommended that you only run WebHDFS with security turned on.


Because you’re using HTTP to communicate with the NameNode in this technique, you’ll need to know the host and port that the NameNode RPC service is running on. This is configured with the dfs.namenode.http-address property. In a pseudo-distributed setup, this is most likely set to We’ll assume a pseudo-distributed setup for the rest of this technique—substitute the appropriate host and port for your setup.

You can start by creating a file in HDFS using the CLI:

$ echo "the cat sat on the mat" | hadoop fs -put - /tmp/hdfs-file.txt

You can use WebHDFS to get at all sorts of interesting metadata about the file (replace aholmes in the following URL with your username):

$ curl -L "
















The syntax for commands is composed of two parts: first the path, followed by the operation being performed. You also need to supply the username that you wish to execute the operation as; otherwise HDFS will assume you’re an anonymous user with restricted access. Figure 5.1 highlights these parts of the URL path.

Figure 5.1. Dissecting the WebHDFS URL path

Reading the file from HDFS is just a matter of specifying OPEN as the operation:

$ curl -L "


the cat sat on the mat

Writing a file using WebHDFS is a two-step process. The first step informs the Name-Node of your intent to create a new file. You do that with an HTTP PUT command:

$ echo "the cat sat on the mat" > local.txt

$ curl -i -X PUT "




Location: http://localhost.localdomain:50075/webhdfs/v1/tmp/




At this point, the file hasn’t been written yet—you just gave the NameNode the opportunity to determine which DataNode you’ll be writing to, which was specified in the “Location” header in the response. You’ll need to grab that URL and then issue a second HTTP PUT to perform the actual write:

$ curl -i -X PUT -T local.txt \




You can verify that the write was successful with a read of the file:

$ hadoop fs -cat /tmp/new-file.txt

the cat sat on the mat

WebHDFS supports all the HDFS operations that you can perform using the regular command line,[4] and it’s more useful because it gives you access to metadata in a structured JSON form, which makes it easier to parse the data.

4 See the WebHDFS REST API page (http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) for the full set of operations you can perform.

It’s worth mentioning some additional features provided by WebHDFS. First, data locality is present for the first block of the file. The NameNode redirects the client to the DataNode that hosts the first block, giving you strong data locality. For subsequent blocks in the file, the DataNode acts as a proxy and streams data to and from the node that holds the block’s data.

WebHDFS is also integrated with Hadoop’s secure authentication, meaning that you can enable Kerberos and use the delegation tokens in your HTTP requests. Additionally, the API will maintain wire-level compatibility across Hadoop releases, meaning that the commands you issue today will work with future versions of Hadoop (and vice versa). This is a useful tool for accessing multiple clusters running different versions of Hadoop.

There are several projects that provide WebHDFS libraries in various languages (listed in table 5.1) to make it easier for you to get up and running with it.[5]

5 In fact, a new C client was written in Hadoop 2 called libwebhdfs to leverage WebHDFS. See https://issues.apache.org/jira/browse/HDFS-2656.

Table 5.1. WebHDFS libraries




libwebhdfs (bundled with Hadoop)




https://github.com/kzk/webhdfs https://rubygems.org/gems/webhdfs



WebHDFS is useful when the client has access to all the NameNodes and DataNodes. In locked-down environments, this may not be the case, and you may need to look at HttpFS.

Technique 35 Accessing HDFS from behind a firewall

Production Hadoop environments are often locked down to protect the data in these clusters. Part of the security procedures could include putting your cluster behind a firewall, which is a nuisance if you’re trying to read from or write to HDFS from outside of the firewall. This technique looks at the HttpFS gateway, which can provide HDFS access using HTTP (which is often opened up on firewalls).


You want to write to HDFS, but there’s a firewall restricting access to the NameNode and/or the DataNodes.


Use the HttpFS gateway, which is a standalone server that provides access to HDFS over HTTP. Because it’s a separate service and it’s HTTP, it can be configured to run on any host that has access to the Hadoop nodes, and you can open a firewall rule to allow traffic to the service.


HttpFS is useful because not only does it allow you use REST to access HDFS, but it has a complete Hadoop filesystem implementation, which means you can use the CLI and native HDFS Java clients to talk to HDFS, as shown in figure 5.2.

Figure 5.2. The HttpFS gateway architecture

To get HttpFS up and running, you’re going to have to designate a proxy user. This is the user that will run the HttpFS process, and this user will also be configured in Hadoop as the proxy user.

Suppose you have a user called foxyproxy that you’re going to designate as your proxy user. You’d update your core-site.xml with this:

Basically, what you’ve done here is indicate that Hadoop should only accept proxy requests from host localhost, and that foxyproxy can impersonate any user (you can lock down the set of users that can be impersonated by supplying a comma-separated list of group names). Change the username, host, and group values so that they make sense in your environment.

Once you’ve made your changes to core-site.xml, you’ll have to bounce Hadoop. Next you’ll need to start the HttpFS process:

$ sbin/httpfs.sh start

Now you can issue the same curl commands that you used in the previous technique with WebHDFS. This is one of the nice things about the HttpFS gateway—the syntax is exactly the same. To perform a directory listing on the root directory, you’d do the following:

$ curl -i "http://localhost:14000/webhdfs/v1/?user.name=poe&


HTTP/1.1 200 OK

Server: Apache-Coyote/1.1

Set-Cookie: hadoop.auth="u=poe&p=poe&t=simple&e=13...

Content-Type: application/json

Transfer-Encoding: chunked

Date: Fri, 10 Jan 2014 00:09:00 GMT












The only difference between this curl command and the ones you used in the previous technique is the port number. HttpFS by default runs on port 14000, but this can be changed by editing httpfs-env.sh. Some of the more interesting properties that can be changed in the file are shown in table 5.2.

Table 5.2. HttpFS properties


Default value




The HTTP port that HttpFS listens on.



The admin port for HttpFS.



The logs directory for HttpFS.


`hostname -f`

The command used to determine which host HttpFS is running on. This information is passed to the NameNode so it can compare this to the value of hadoop.proxyuser.${user}.hosts that you configured earlier in core-site.xml.

There are additional Kerberos and user- and group-level settings that can be configured in httpfs-site.xml.[6]

6 Consult the “HttpFS configuration properties” web page at http://hadoop.apache.org/docs/stable/hadoop-hdfs-httpfs/httpfs-default.html.

Differences between WebHDFS and HttpFS

The primary difference between WebHDFS and HttpFS is the accessibility of the client to all the data nodes. If your client has access to all the data nodes, then WebHDFS will work for you, as reading and writing files involves the client talking directly to the data nodes for data transfer. On the other hand, if you’re behind a firewall, your client probably doesn’t have access to all the data nodes, in which case the HttpFS option will work best for you. With HttpFS, the server will talk to the data nodes, and your client just needs to talk to the single HttpFS server.

If you have a choice, pick WebHDFS because there’s an inherent advantage in clients talking directly to the data nodes—it allows you to easily scale the number of concurrent clients across multiple hosts without hitting the network bottleneck of all the data being streamed via the HttpFS server. This is especially true if your clients are running on the data nodes themselves, as you’ll be using the data locality benefits of WebHDFS by directly streaming any locally hosted HDFS data blocks from the local filesystem instead of over the network.

Technique 36 Mounting Hadoop with NFS

Often it’s a lot easier to work with Hadoop data if it’s accessible as a regular mount to your filesystem. This allows you to use existing scripts, tools, and programming languages and to interact with your data in HDFS. This section looks at how you can easily copy data in and out of HDFS using an NFS mount.


You want to treat HDFS as a regular Linux filesystem and use standard Linux tools to interact with HDFS.


Use Hadoop’s NFS implementation to access data in HDFS.


Prior to Hadoop 2.1, the only way to NFS-mount HDFS was with FUSE. It wasn’t recommended for general use due to various performance and reliability issues. It also introduced an additional burden of requiring the driver to be installed on any client machine (in other words, it didn’t provide an NFS gateway).

The new NFS implementation in Hadoop addresses all of the shortcomings with the old FUSE-based system. It’s a proper NFSv3 implementation, and it allows you to run one or more NFS gateways for increased availability and throughput.

Figure 5.3 shows the various Hadoop NFS components in action.

Figure 5.3. Hadoop NFS

To get the NFS services up and running, you’ll first need to stop the NFS services running on your host. On Linux systems this can be achieved with the following commands:

$ service portmap stop

$ service nfs stop

$ service rpcbind stop

Next you need to start the Hadoop NFS services. The first service you’ll launch is portmap, which provides a registry service for protocols and their associated transports and ports. It runs on a restricted port, so it needs to be launched as a root user:

$ sudo hadoop-daemon.sh start portmap

Next you need to start the actual NFS service. It’s important that the user running this service be the same user that you use to run HDFS:

$ hadoop-daemon.sh start nfs3

Verify that the services are running by running rpcinfo and showmount—you should see output similar to the following:

$ /usr/sbin/rpcinfo -p localhost

   program vers proto   port

    100005    1   tcp   4242  mountd

    100000    2   udp    111  portmapper

    100005    3   tcp   4242  mountd

    100005    2   udp   4242  mountd

    100003    3   tcp   2049  nfs

    100000    2   tcp    111  portmapper

    100005    3   udp   4242  mountd

    100005    1   udp   4242  mountd

    100005    2   tcp   4242  mountd

$ /usr/sbin/showmount -e localhost

Export list for localhost:

/ *

Now you need to mount HDFS on a directory on your host. In the following example I’ve picked /hdfs as the mount directory. The second mount command verifies that the mount has been created:

$ sudo mkdir /hdfs

$ sudo mount -t nfs -o vers=3,proto=tcp,nolock localhost:/  /hdfs

$ mount | grep hdfs

localhost:/ on /hdfs type nfs (rw,nfsvers=3,proto=tcp,nolock,


You’re all set! Now you can manipulate HDFS directly using the mounted filesystem.

There are a few things to consider when using the NFS gateway:

·        HDFS is an append-only filesystem. You can append to files, but you can’t perform random writes. If you have a hard-and-fast requirement that you need to work with Hadoop using a filesystem that supports random writes, you should take a look at MapR’s distribution of Hadoop.

·        Hadoop version 2.2 doesn’t support secure Hadoop (Kerberos), and there’s an open ticket to add that support.[7]

7 Kerberos support in the NFS gateway is being tracked in https://issues.apache.org/jira/browse/HDFS-5539.

·        Support for proxy users isn’t available until Hadoop 2.4 (or 3). This essentially means that previous versions of Hadoop will execute all commands as superuser, because there’s a requirement that the NFS gateway run as the same user as HDFS itself.

Due the these restrictions, it’s advised that the NFS gateway be reserved for experimental use, or for use in a single-tenant cluster where user-level security isn’t a concern.

Technique 37 Using DistCp to copy data within and between clusters

Imagine that you have a large amount of data you want to move into or out of Hadoop. With most of the techniques in this section, you have a bottleneck because you’re funneling the data through a single host, which is the host on which you’re running the process. To optimize data movement as much as possible, you want to leverage MapReduce to copy data in parallel. This is where DistCp comes into play, and this technique examines several ways that you can use DistCp to efficiently copy data between Hadoop clusters, as well as into and out of an NFS mount.


You want to efficiently copy large amounts of data between Hadoop clusters and have the ability for incremental copies.


Use DistCp, a parallel file-copy tool built into Hadoop.


In this section, we’ll start by covering the important configuration aspects of DistCp. After that, we’ll go on to look at specific scenarios where you’ll want to use DistCp, and the best way to configure and run it.


DistCp version 2

This technique covers the newer version of DistCp available in Hadoop 2, called DistCp 2. This code was backported into Hadoop 1.2.0 and is available by using distcp2 as the command—on Hadoop 2 it replaces the existing DistCp so the normal distcp command can be used.

DistCp 2 supports the same set of command-line arguments as the legacy version of DistCp, but brings with it a number of useful advantages:

·        Reduced setup and execution time when working with a large number of files, as the driver no longer needs to preprocess all the inputs (this is now deferred to the mappers).

·        It now has a full-featured Java interface and removes the need for Java clients to serialize arguments into strings.

·        Atomic commits allow all-or-none copying semantics.

·        Using option -update to skip files that already exist in the destination will result in file attributes being changed if they differ from the source files.

·        Empty directories are no longer skipped as part of the copy.


DistCp utilizes a map-only MapReduce job to perform a copy. A very simple example follows, where it’s used within a single Hadoop cluster to copy the source directory, /hello, into a destination directory, /world:

$ hadoop distcp /hello /world

This command will create the /world directory if it doesn’t already exist, and then copy the contents of /hello (all its files and directories recursively) into /world. You may be wondering how DistCp deals with files that already exist in the destination—keep on reading for details.

Dealing with destination files that already exist

Files and directories that already exist in the destination are left untouched (even if the files are different). You can change this behavior by adding the arguments shown in table 5.3.

Table 5.3. DistCp arguments that impact where files are copied, and the behavior should destination files preexist



None (neither -update nor -overwrite)

Source files are never recopied if the destination already exists.


Source files are recopied if any of the following are true:

·        Source and destination file sizes are different.

·        Source and destination file CRCs don’t match.[a]

a File CRC checks can be turned off with the -skipcrccheck argument.

·        Source and destination file block sizes don’t match.


Source files are always recopied if the destination file already exists.

You can see the number of files that are skipped by looking at the SKIP counter that’s dumped to standard output when the job completes:




Another factor to understand about the -update and -overwrite arguments is that they subtly change the behavior of what is copied. Without these options, if the source is a directory, that directory is created under the destination directory. With either the -update or -overwritearguments, only the files and subdirectories are copied, and not the source directory. This is best demonstrated with an example:

# create a source directory and file

$ hadoop fs -mkdir /src

$ hadoop fs -touchz /src/file1.txt

# create a destination directory

$ hadoop fs -mkdir /dest

# run a distcp without any options

$ hadoop distcp /src /dest

$ hadoop fs -ls -R /dest



# now run the same command again with

# the -update argument

$ hadoop distcp -update /src /dest

$ hadoop fs -ls -R /dest




Ignoring errors

When you’re using DistCp to copy over a large number of files, it’s wise to execute the command with the -i flag to ignore errors. This way a single error won’t cause your entire copy process to fail, and you can reattempt to copy any failed files by reissuing the same DistCp command with the -update option.

Dynamic copy strategy

The default behavior for DistCp is to preallocate work for each mapper by evenly spreading all the files in such a way that all the mappers are copying approximately the same number of bytes. In theory, this sounds like a great way to fairly allocate work, but in reality, factors such as differing hardware, hardware errors, and poor configuration often results in long-tail job execution, where a handful of straggler mappers take much longer than the others.

With DistCp 2 you can use an alternative strategy, where the mappers pick up work directly as opposed to having it preallocated. This is called the dynamic copy strategy, and it’s activated with the -strategy dynamic argument. The net effect of adding this argument is improved copy times, as the faster mappers can pick up the slack of the slower mappers.

Atomic commits

Another useful feature in DistCp 2 is the notion of atomic commits. The default behavior of DistCp is for each file to be written to a temporary file and then moved to the final destination. This means that there would be no way to undo any files that were copied prior to an error encountered in the job.

Atomic commits therefore allow you to defer the actual “commit” until the end of the job when all files have been copied so that you don’t see any partial writes if an error is encountered. This feature can be enabled using the -atomic argument.

Parallelism and number of mappers

Currently the most granular unit of work for DistCp is at the file level. Therefore, only one mapper will be used to copy each file, regardless of how large the files are. Bumping up the number of mappers for a job won’t have any effect on speeding up the copy.

By default, DistCp runs with 20 mappers, and which files each mapper copies are determined by the copy strategy you have selected. The Hadoop developers put some thought into the default setting for the number of mappers—choosing the right value is a function of how much network bandwidth you want to utilize (discussed next), and how many tasks you want to occupy during the copy.

You can change the number of mappers by specifying -m followed by your desired value.


A final consideration worth mentioning is the network bandwidth used during a copy. Large copies can saturate and overwhelm the network between clusters. One way to keep on the good side of the network operations folks in your organization is to use the -bandwidth argument to specify a cap on the amount of bandwidth each map task consumes during a copy. The value for this argument is in megabytes per second (MBps).

Additional options

So far we’ve looked at some of the more interesting options in DistCp. To see the full list of options, you can run the distcp command without any options, or head on over to the online Hadoop docs at http://hadoop.apache.org/docs/r1.2.1/distcp2.html.

Copying data from an NFS mount into HDFS

DistCp may be a good fit if you have files sitting on a filer or a NAS that you want to copy into HDFS. This will only work if all the DataNodes have the data mounted, because the DistCp mappers running on the DataNodes require access to both the source and destination. The following example shows how you would perform the copy. Note the file scheme used to tell Hadoop that the local filesystem should be used as the source:

$ hadoop distcp file://my_filer/source /dest

Copying data within the same cluster

In what situations would you use DistCp in place of a regular hadoop fs -cp command? The regular cp command is a single-threaded approach to copying data—it goes file by file, and streams the data from the server to the client and back out to the server. Compare that to DistCp, which launches a MapReduce job that uses multiple mappers to perform the copy. As a rule of thumb, you should use the regular copy process when dealing with tens of GBs and consider DistCp when working with hundreds of GBs or more.

When the same cluster is both the source and the destination, nothing special is required to qualify the source or destination:

$ hadoop distcp /source /dest

Copying between two clusters running the same version of Hadoop

Now let’s look at copying data between two clusters running the same version of Hadoop. This approach optimizes for the fact that they’re both running the same version of Hadoop by using Hadoop-native filesystem reads and writes, which emphasize data locality. Unfortunately the Hadoop RPC is sensitive to the fact that the client and server versions are identical, so this won’t work if the versions differ. In that situation you’ll need to skip to the next subsection.

Imagine that you have two HDFS setups, one running on nn1 and the other on nn2, and both NameNodes are running on the default RPC port.[8] Copying files from the /source to the /dest directories between the clusters would be achieved with the following command:

8 To figure out the actual host and port for each NameNode, examine the value of fs.default.name or fs.defaultFS in core-site.xml.

$ hadoop distcp hdfs://nn1:8020/source hdfs://nn2:8020/dest

With two clusters in play, you may be wondering which cluster you should use to run DistCp. If you have a firewall sitting between the clusters and ports can only be opened in one direction, then you’ll have to run the job on the cluster that has read or write access to the other cluster.

Next let’s look at how to run DistCp between clusters on different Hadoop versions.

Copying between clusters running different versions of Hadoop

The previous approach won’t work when your clusters are running different versions of Hadoop. Hadoop’s RPC doesn’t have backward or forward compatibility built into it, so a newer version of the Hadoop client can’t talk to an older version of a Hadoop cluster, and vice versa.

With recent versions of Hadoop, you have two options for the copy: the older HFTP and the newer WebHDFS. Let’s first look at the legacy method, HFTP.

HFTP is a version-independent interface on HDFS that uses HTTP as the transport mechanism. It offers a read-only view into HDFS, so by definition this means that you’ll have to always use it as the source in your DistCp. It’s enabled via the hftp scheme in the NameNode URI, as seen in the following example:

$ hadoop distcp hftp://nn1:50070/source hdfs://nn2:8020/dest

Look at hdfs-site.xml (and hdfs-default.xml if you don’t see it in hdfs-site.xml) to figure out the host and port to use for HFTP (specifically dfs.http.port, or dfs.namenode.http-address if it’s not set). If securing the data in transit is important to you, look at using the HFTPS scheme, which uses HTTPS for transport (configure or examine dfs.hftp.https.port, which if not set will default to dfs.https.port for the port).

With HFTP(S), you’ll have to run the DistCp command on the destination cluster so that HDFS writes using the same Hadoop client version as the destination. But what if this is too constrained for your environment—what if you have a firewall that doesn’t allow you to run DistCp on the destination? That’s where WebHDFS comes into play.

WebHDFS has the advantage over HFTP of providing both a read and write interface. You can use it for either the source or destination in your DistCp, as shown here:

$ hadoop distcp hdfs://nn1:50070/source webhdfs://nn2:50070/dest

WebHDFS has an additional benefit in the form of data locality—it uses HTTP redirection when reading and writing data so that reads and writes are performed with the actual DataNode that stores the data. It’s highly recommended that you use WebHDFS rather than HFTP for both its writing abilities and the performance improvements.

Examine the value of dfs.namenode.http-address to determine the host and port that you should use with WebHDFS.

Other destinations

DistCp works with any implementation of the Hadoop filesystem interface; table 5.4 shows the most popular implementations that are bundled with Hadoop.

Table 5.4. URI schemes and their related Hadoop filesystem implementations




Provides native access to Hadoop’s own HDFS. The only downside is that backward and forward compatibility aren’t supported.


Used to read and write from the local filesystem.

hftp and hsftp

A legacy, read-only view on top of HDFS that emphasized API compatibility to support any version of Hadoop. It was the old-school way of copying data between clusters running different versions of Hadoop. hsftp provides an implementation that uses HTTPS for transport for added security.


Can be used with both WebHDFS (see technique 34) if your client has access to the Hadoop cluster, and the HttpFS gateway (see technique 35) for accessing HDFS from behind a firewall. This is the replacement for the read-only hftp implementation. It supports a read and write interface to HDFS. In addition, this filesystem can be used to read and write between different versions of Hadoop.


Uses FTP as the storage implementation.

s3 and s3n

Provides access to Amazon’s S3 filesystem. s3n provides native access to S3, whereas the s3 scheme stores data in a block-based manner to work around S3’s maximum file-size constraints.


DistCp is a powerful tool for moving data into and between Hadoop filesystems. Features such as incremental copies enable it to be used in a near-continuous fashion to synchronize directories on two systems. And its ability to copy data between Hadoop versions means that it’s a very popular way of synchronizing data across multiple Hadoop clusters.


Executing DistCp

When you’re running a DistCp command, it’s recommended that you execute it within a screen session,[9] or at least use nohup to redirect the output to a local file.

9 Screen is a Linux utility that manages virtual shells and allows them to persist even when the parent shell has terminated. Matt Cutts has an excellent overview on his site called “A quick tutorial on screen,” www.mattcutts.com/blog/a-quick-tutorial-on-screen/.


One limitation of DistCp is that it supports multiple source directories but only a single destination directory. This means you can’t use a single DistCp job to perform a one-directional synchronization between clusters (unless you only need to sync a single directory). In this situation, you could run multiple DistCp jobs, or you could run a single job and sync to a staging directory, and then follow up the copy with a fs -mv to move the staged files into the ultimate destinations.

Technique 38 Using Java to load files

Let’s say you’ve generated a number of Lucene indexes in HDFS and you want to pull them out to an external host. Maybe, as part of pulling the data out, you want to manipulate the files in some way using Java. This technique shows how the Java HDFS API can be used to read and write data in HDFS.


You want to incorporate writing to HDFS into your Java application.


Use the Hadoop Java API to access data in HDFS.


The HDFS Java API is nicely integrated with Java’s I/O model, which means you can work with regular InputStreams and OutputStreams for I/O. To perform filesystem-level operations such as creating, opening, and removing files, Hadoop has an abstract class called FileSystem, which is extended and implemented for specific filesystems that can be leveraged in Hadoop.

Earlier, in technique 33, you saw an example of how you can use the CLI to stream data from standard input to a file in HDFS:

$ echo "hello world" | hadoop fs -put - hdfs-file.txt

Let’s explore how to do that in Java. There are two main parts to writing the code that does this: getting a handle to the FileSystem and creating the file, and then copying the data from standard input to the OutputStream:

You can see how this code works in practice by running the following command:

$ echo "the cat" | hip hip.ch5.CopyStreamToHdfs --output test.txt

$ hadoop fs -cat test.txt

the cat

Let’s circle back into the code to understand how it worked. The following code snippet was used to get a handle to the FileSystem. But how did Hadoop know which concrete filesystem to return?

FileSystem fs = FileSystem.get(conf);

The key is in the conf object that’s passed into the get method. What’s happening is that the FileSystem class examines the value of the fs.defaultFS property,[10] which contains a URI identifying the filesystem that should be used. By default, this is configured to be the local filesystem (file:///), which is why if you try running Hadoop out of the box without any configuration, you’ll be using your local filesystem and not HDFS.

10 fs.default.name is the deprecated property used in Hadoop 1.

In a pseudo-distributed setup like the one in the appendix, one of the first things you would do is configure core-site.xml with an HDFS filesystem:





Hadoop takes the scheme from the URL (hdfs in the preceding example) and performs a lookup to discover the concrete filesystem. There are two ways that a filesystem can be discovered:

·        Built-in filesystems are automatically discovered, and their getScheme methods are called to determine their schemes. In the example of HDFS, the implementation class is org.apache.hadoop.hdfs.DistributedFileSystem and the getScheme method returns hdfs.

·        Filesystems that aren’t built into Hadoop can be identified by updating coresite .xml with fs.$scheme.impl, where $scheme would be replaced with the scheme identified in the URI.

The FileSystem class has a number of methods for manipulating a filesystem—some of the more commonly used methods are listed here:

static FileSystem get(Configuration conf)

static LocalFileSystem getLocal(Configuration conf)

static FSDataOutputStream create(FileSystem fs, Path file)

FSDataInputStream open(Path f, int bufferSize)

boolean delete(Path f, boolean recursive)

boolean mkdirs(Path f)

void copyFromLocalFile(Path src, Path dst)

void copyToLocalFile(Path src, Path dst)

FileStatus getFileStatus(Path f)

void close()

5.2.2. Continuous movement of log and binary files into HDFS

Log data has long been prevalent across all applications, but with Hadoop came the ability to process the large volumes of log data produced by production systems. Various systems produce log data, from network devices and operating systems to web servers and applications. These log files all offer the potential for valuable insights into how systems and applications operate, as well as how they’re used. What unifies log files is that they tend to be in text form and line-oriented, making them easy to process.

In the previous section we covered low-level methods that you can use to copy data into Hadoop. Rather than build your own data-movement tools using these methods, this section introduces some higher-level tools that simplify moving your log and binary data into Hadoop. Tools like Flume, Sqoop, and Oozie provide mechanisms to periodically (or continuously) move data from various data sources such as files, relational databases, and messaging systems into Hadoop, and they’ve already solved many of the hard problems of dealing with multiple data sources spread across different hosts.

Let’s get started by looking at how Flume can be used to ingest log files into HDFS.


Preferred data-movement methods

The techniques in this section work well if you’re working in a constrained legacy environment where you have files that you need to automatically move into HDFS.

An alternative architecture would be to use Kafka as a mechanism to transport your data, which would allow you to decouple the producers from the consumers and at the same time enable multiple consumers to operate on the data in different ways. In this situation, you’d use Kafka to both land data on Hadoop and provide a feed into a real-time data-streaming system such as Storm or Spark Streaming, which you could then use to perform near-real-time computations. One scenario that this enables is a Lambda Architecture, which allows you to calculate aggregated data in real time in small increments, and to use the batch tier to perform functions such as error correction and adding new data points, thus playing to the strengths of both real-time and batch systems.


Technique 39 Pushing system log messages into HDFS with Flume

A bunch of log files are being produced by multiple applications and systems across multiple servers. There’s no doubt there’s valuable information to be mined from these logs, but your first challenge is a logistical one of moving these logs into your Hadoop cluster so that you can perform some analysis.


Versioning caveat emptor

This section on Flume covers release 1.4. As with all software, there are no guarantees that the techniques, code, and configuration covered here will work out of the box with different versions of Flume. Further, Flume 1.4 requires some updates to get it to work with Hadoop 2—see the Flume section in the appendix for more details.



You want to push all of your production server’s system log files into HDFS.


For this technique you’ll use Flume, a data collection system, to push a Linux log file into HDFS.


Flume, at its heart, is a log file collection and distribution system, and collecting system logs and transporting them to HDFS is its bread and butter. Your first step in this technique will involve capturing all data appended to /var/log/messages and transporting it to HDFS. You’ll run a single Flume agent (more details on what that means later), which will do all this work for you.

A Flume agent needs a configuration file to tell it what to do, so let’s go ahead and define one for this use case:

# define source, channel and sink

agent1.sources = tail_source1

agent1.channels = ch1

agent1.sinks = hdfs_sink1

# define tail source

agent1.sources.tail_source1.type = exec

agent1.sources.tail_source1.channels = ch1

agent1.sources.tail_source1.shell = /bin/bash -c

agent1.sources.tail_source1.command = tail -F /var/log/messages

agent1.sources.tail_source1.interceptors = ts

agent1.sources.tail_source1.interceptors.ts.type = timestamp

# define in-memory channel

agent1.channels.ch1.type = memory

agent1.channels.ch1.capacity = 100000

agent1.channels.ch1.transactionCapacity = 1000

# define HDFS sink properties

agent1.sinks.hdfs_sink1.type = hdfs

agent1.sinks.hdfs_sink1.hdfs.path = /flume/%y%m%d/%H%M%S

agent1.sinks.hdfs_sink1.hdfs.fileType = DataStream

agent1.sinks.hdfs_sink1.channel = ch1

We’ll examine the contents of this file shortly, but before we do that, let’s see Flume in action.


System prerequisites

For the following example to work, you’ll need to make sure that you’re working on a host that has access to a Hadoop cluster (see the appendix if you need to get up one up and running), and that your HADOOP_HOME is configured appropriately. You’ll also need to have Flume downloaded and installed and have FLUME_HOME set to point to the installation directory.


Copy the preceding file into your Flume conf directory using the filename tail-hdfs-part1.conf. Once you do that, you’re ready to start an instance of a Flume agent:

$ ${FLUME_HOME}/bin/flume-ng agent \

  --conf ${FLUME_HOME}/conf/ \

  -f ${FLUME_HOME}/conf/tail-hdfs-part1.conf \

  -Dflume.root.logger=DEBUG,console \

  -n agent1

This should generate a lot of output, but ultimately you should see output similar to the following, indicating that everything came up OK:

Component type: CHANNEL, name: ch1 started

Exec source starting with command:tail -F /var/log/messages

Component type: SINK, name: hdfs_sink1 started

At this point, you should start to see some data appearing in HDFS:

$ hadoop fs -lsr /flume


The .tmp suffix means that Flume has the file open and will continue to write to it. Once it’s done, it’ll rename the file and remove the suffix:


You can cat this file to examine its contents—the contents should line up with tail /var/log/messages.

If you got this far, you’ve completed your first data move with Flume!

Dissecting a Flume agent

Let’s take a few steps back and examine what you did. There were two main parts to your work: defining the Flume configuration file, and running the Flume agent. The Flume configuration file contains details on your sourceschannels, and sinks. These are all Flume concepts that impact different parts of Flume’s data flow. Figure 5.4 shows these concepts in action in a Flume agent.

Figure 5.4. Flume components illustrated within the context of an agent

Let’s step through these Flume concepts and look at their purpose and how they work.


Flume sources are responsible for reading data from external clients or from other Flume sinks. A unit of data in Flume is defined as an event, which is essentially a payload and optional set of metadata. A Flume source sends these events to one or more Flume channels, which deal with storage and buffering.

Flume has an extensive set of built-in sources, including HTTP, JMS, and RPC, and you encountered one of them just a few moments ago.[11] Let’s take a look at the source-specific configuration properties that you set:

11 The full set of Flume sources can be seen at http://flume.apache.org/FlumeUserGuide.html#flume-sources.

agent1.sources = tail_source1

# define tail source

agent1.sources.tail_source1.type = exec

agent1.sources.tail_source1.channels = ch1

agent1.sources.tail_source1.shell = /bin/bash -c

agent1.sources.tail_source1.command = tail -F /var/log/messages

agent1.sources.tail_source1.interceptors = ts

agent1.sources.tail_source1.interceptors.ts.type = timestamp

The exec source allows you to execute a Unix command, and each line emitted in standard output is captured as an event (standard error is ignored by default). In the preceding example, the tail -F command is used to capture system messages as they are produced.[12] If you have more control over your files (if, for example, you can move them into a directory after all writes have completed), consider using Flume’s spooling directory source (called spooldir), as it offers reliability semantics that you don’t get with the exec source.

12 Use of the capital F in tail means that tail will continue to retry opening the file, which is useful in situations where the file is rotated.


Only use tail for testing

Using tail for anything other than testing is discouraged.


Another feature highlighted in this configuration is interceptors, which allow you to add metadata to events. Recall that the data in HDFS was organized according to a timestamp—the first part was the date, and the second part was the time:


You were able to do this because you modified each event with a timestamp interceptor, which inserted into the event header the time in milliseconds when the source processed the event. This timestamp was then used by the Flume HDFS sink to determine where an event was written.

To conclude our brief dive into Flume sources, let’s summarize some of the interesting abilities that they provide:

·        Transactional semantics, which allow data to be reliably moved with at-least-once semantics. Not all data sources support this.[13]

13 The exec source used in this technique is an example of a source that doesn’t provide any data-reliability guarantees.

·        Interceptors, which provide the ability to modify or drop events. They are useful for annotating events with host, time, and unique identifiers, which are useful for deduplication.

·        Selectors, which allow events to be fanned out or multiplexed in various ways. You can fan out events by replicating them to multiple channels, or you can route them to different channels based on event headers.


Flume channels provide data storage facilities inside an agent. Sources add events to a channel, and sinks remove events from a channel. Channels provide durability properties inside Flume, and you pick a channel based on which level of durability and throughput you need for your application.

There are three channels bundled with Flume:

·        Memory channels store events in an in-memory queue. This is very useful for high-throughput data flows, but they have no durability guarantees, meaning that if an agent goes down, you’ll lose data.

·        File channels persist events to disk. The implementation uses an efficient write-ahead log and has strong durability properties.

·        JDBC channels store events in a database. This provides the strongest durability and recoverability properties, but at a cost to performance.

In the previous example, you used an in-memory channel and capped the number of events that it would store at 100,000. Once the maximum number of events is reached in a memory channel, it will start refusing additional requests from sources to add more events. Depending on the type of source, this means that the source will either retry or drop the event (the exec source will drop the event):

agent1.channels = ch1

# define in-memory channel

agent1.channels.ch1.type = memory

agent1.channels.ch1.capacity = 100000

agent1.channels.ch1.transactionCapacity = 1000

Additional details on Flume channels can be seen at http://flume.apache.org/FlumeUserGuide.html#flume-channels.


A Flume sink drains events out of one or more Flume channels and will either forward these events to another Flume source (in a multihop flow), or handle the events in a sink-specific manner. There are a number of sinks built into Flume, including HDFS, HBase, Solr, and Elasticsearch.

In the previous example, you configured the flow to use an HDFS sink:

agent1.sinks = hdfs_sink1

# define HDFS sink properties

agent1.sinks.hdfs_sink1.type = hdfs

agent1.sinks.hdfs_sink1.hdfs.path = /flume/%y%m%d/%H%M%S

agent1.sinks.hdfs_sink1.hdfs.fileType = DataStream

agent1.sinks.hdfs_sink1.channel = ch1

You configured the sink to write files based on a timestamp (note the %y and other timestamp aliases). You could do this because you decorated the events with a timestamp interceptor in the exec source. In fact, you can use any header value to determine the output location for events (for example, you can add a host interceptor and then write files according to which host produced the event).

The HDFS sink can be configured in various ways to determine how files are rolled. When a sink reads the first event, it will open a new file (if one isn’t already open) and write to it. By default, the sink will continue to keep the file open and write events into it for 30 seconds, after which it will close it out. The rolling behavior can be changed with the properties in table 5.5.

Table 5.5. Rollover properties for Flume’s HDFS sink


Default value




Number of seconds to wait before rolling current file (0 = never roll based on time interval)



File size to trigger roll, in bytes (0 = never roll based on file size)



Number of events written to file before it rolls (0 = never roll based on number of events)



Timeout after which inactive files get closed (0 = disable automatic closing of idle files)



Number of events written to file before it’s flushed to HDFS


Default settings for the HDFS sink

The default HDFS sink settings shouldn’t be used in production, as they’ll result in a large number of potentially small files. It’s recommended that you either bump up the values or use a downstream compaction job to coalesce these small files.


The HDFS sink allows you to specify how events are serialized when writing files. By default, they’re serialized in text format, without any headers added by interceptors. If, for example, you want to write data in Avro, which also includes event headers, you can use the serializer configuration to do this. In doing so, you can also specify a Hadoop compression codec that Avro uses internally to compress data:

agent1.sinks.hdfs_sink1.serializer = avro_event

agent1.sinks.hdfs_sink1.serializer.compressionCodec = snappy


Reliability in Flume is determined by the type of channel you use, whether your data sources have the ability to retransmit events, and whether you multiplex events to multiple sources to mitigate against unrecoverable node failure. In this technique, the memory channel and exec source were used, but neither provides reliability in the face of failure. One way to add that reliability would be to replace the exec source with a spooling directory source and replace the memory channel with a disk channel.

You’ve used Flume on a single machine running a single agent with a single source, channel, and sink. But Flume can support a fully distributed setup where you have agents running on multiple hosts with multiple agent hops between the source and final destinations. Figure 5.5 shows one example of how Flume can function in a distributed environment.

Figure 5.5. A Flume setup that uses load balancing and fan-in to move log4j logs into HDFS

The goal of this technique is to move data into HDFS. Flume, however, can support various data sinks, including HBase, a file roll, Elasticsearch, and Solr. Using Flume to write to Elasticsearch or Solr enables a powerful near-real-time indexing strategy.

Flume, then, is a very powerful data movement project, which can easily support moving your data into HDFS as well as many other locations. It moves data continuously and supports various levels of resiliency to work around failures in your systems. And it’s a simple system to configure and run.

One area that Flume isn’t really optimized for is working with binary data. It can support moving binary data, but it loads the entire binary event into memory, so moving files that are gigabytes in size or larger won’t work. The next technique looks at how such data can be moved into HDFS.

Technique 40 An automated mechanism to copy files into HDFS

You’ve learned how to use log-collecting tools like Flume to automate moving data into HDFS. But these tools don’t support working with semistructured or binary data out of the box. In this technique, we’ll look how to automate moving such files into HDFS.

Production networks typically have network silos where your Hadoop clusters are segmented away from other production applications. In such cases, it’s possible that your Hadoop cluster won’t be able to pull data from other data sources, leaving you with no option but to push data into Hadoop.

You need a mechanism to automate the process of copying files of any format into HDFS, similar to the Linux tool rsync. The mechanism should be able to compress files written in HDFS and offer a way to dynamically determine the HDFS destination for data-partitioning purposes.

Existing file transportation mechanisms such as Flume, Scribe, and Chukwa are geared toward supporting log files. What if you have different formats for your files, such as semistructured or binary? If the files were siloed in a way that the Hadoop slave nodes couldn’t directly access, then you couldn’t use Oozie to help with file ingress either.


You need to automate the process by which files on remote servers are copied into HDFS.


The open source HDFS File Slurper project can copy files of any format into and out of HDFS. This technique covers how it can be configured and used to copy data into HDFS.


You can use the HDFS File Slurper project (which I wrote) to assist with your automation (https://github.com/alexholmes/hdfs-file-slurper). The HDFS File Slurper is a simple utility that supports copying files from a local directory into HDFS and vice versa.

Figure 5.6 provides a high-level overview of the Slurper (my nickname for the project), with an example of how you can use it to copy files. The Slurper reads any files that exist in a source directory and optionally consults with a script to determine the file placement in the destination directory. It then writes the file to the destination, after which there’s an optional verification step. Finally, the Slurper moves the source file to a completed folder upon successful completion of all of the previous steps.

Figure 5.6. HDFS File Slurper data flow for copying files

With this technique, there are a few challenges you need to make sure to address:

·        How do you effectively partition your writes to HDFS so that you don’t lump everything into a single directory?

·        How do you determine that your data in HDFS is ready for processing (to avoid reading files that are mid-copy)?

·        How do you automate regular execution of your utility?

Your first step is to download the latest HDFS File Slurper tarball from https://github.com/alexholmes/hdfs-file-slurper/releases and install it on a host that has access to both a Hadoop cluster and a local install of Hadoop:

$ sudo tar -xzf target/hdfs-slurper-<version>-package.tar.gz \

  -C /usr/local/

$ sudo ln -s /usr/local/hdfs-slurper-<version> \



Before you can run the code, you’ll need to edit /usr/local/hdfs-slurper/conf/slurper-env.sh and set the location of the hadoop script. The following code is an example of what slurper-eng.sh file looks like if you followed the Hadoop installation instructions in the appendix:

$ cat /usr/local/hdfs-slurper/conf/slurper-env.sh

export HADOOP_BIN=/usr/local/hadoop/bin/hadoop

The Slurper comes bundled with a /usr/local/hdfs-slurper/conf/slurper.conf file, which contains details on the source and destination directories, along with other options. The file contains the following default settings, which you can change:

Let’s take a closer look at these settings:

·        DATASOURCE_NAME —This specifies the name for the data being transferred. It’s used for the log filename when launched via the Linux init daemon management system, which we’ll cover shortly.

·        SRC_DIR —This specifies the source directory. Any files moved into here are automatically copied to the destination directory (with an intermediary hop to the staging directory).

·        WORK_DIR —This is the work directory. Files from the source directory are moved here before the copy to the destination starts.

·        COMPLETE_DIR —This specifies the complete directory. After the copy has completed, the file is moved from the work directory into this directory. Alternatively, the --remove-after-copy option can be used to delete the source file, in which case the --complete-dir option shouldn’t be supplied.

·        ERROR_DIR —This is the error directory. Any errors encountered during the copy result in the source file being moved into this directory.

·        DEST_DIR —This sets the final destination directory for source files.

·        DEST_STAGING_DIR —This specifies the staging directory. A file is first copied into this directory, and once the copy succeeds, the Slurper moves the copy into the destination to avoid the possibility of the destination directory containing partially written files (in the event of failure).

You’ll notice that all of the directory names are HDFS URIs. HDFS distinguishes between different filesystems in this way. The file:/ URI denotes a path on the local filesystem, and the hdfs:/ URI denotes a path in HDFS. In fact, the Slurper supports any Hadoop filesystem, as long as you configure Hadoop to use it.


Let’s create a local directory called /tmp/slurper/in, write an empty file into it, and run the Slurper:

$ mkdir -p /tmp/slurper/in

$ touch /tmp/slurper/in/test-file.txt

$ cd /usr/local/hdfs-slurper/

$ bin/slurper.sh  --config-file conf/slurper.conf

Copying source file 'file:/tmp/slurper/work/test-file.txt'

to staging destination 'hdfs:/tmp/slurper/stage/1354823335'

Moving staging file 'hdfs:/tmp/slurper/stage/1354823335'

to destination 'hdfs:/tmp/slurper/dest/test-file.txt'

File copy successful, moving source

file:/tmp/slurper/work/test-file.txt to completed file


$ hadoop fs -ls /tmp/slurper/dest


A key feature in the Slurper’s design is that it doesn’t work with partially written files. Files must be atomically moved into the source directory (file moves in both the Linux and HDFS filesystems are atomic).[14] Alternatively, you can write to a filename that starts with a period (.), which is ignored by the Slurper, and after the file write completes, you can rename the file to a name without the period prefix.

14 Moving files is atomic only if both the source and destination are on the same partition. In other words, moving a file from an NFS mount to a local disk results in a copy, which isn’t atomic.

Be aware that copying multiple files with the same filename will result in the destination being overwritten—the onus is on the user to make sure that files are unique to prevent this from happening.

Dynamic destination paths

The previous approach works well if you’re moving a small number of files into HDFS on a daily basis. But if you’re dealing with a large volume of files, you’ll want to think about partitioning them into separate directories. This has the benefit of giving you more fine-grained control over the input data for your MapReduce jobs, as well as helping with the overall organization of your data in the filesystem (you wouldn’t want all the files on your computer to reside in a single flat directory).

How can you have more dynamic control over the destination directory and the filename that the Slurper uses? The Slurper configuration file has a SCRIPT option (which is mutually exclusive of the DEST_DIR option), where you can specify a script that provides dynamic mapping of the source files to destination files.

Let’s assume that the files you’re working with contain a date in the filename, and you’ve decided that you want to organize your data in HDFS by date. You can write a script to perform this mapping activity. The following example is a Python script that does this:


import sys, os, re

# read the local file from standard input


# extract the filename from the file

filename = os.path.basename(input_file)

# extract the date from the filename

match=re.search(r'([0-9]{4})([0-9]{2})([0-9]{2})', filename)




# construct our destination HDFS file

hdfs_dest="hdfs:/data/%s/%s/%s/%s" % (year, mon, day, filename)

# write it to standard output

print hdfs_dest,

Now you can update /usr/local/hdfs-slurper/conf/slurper.conf, set SCRIPT, and comment out DEST_DIR, which results in the following entry in the file:

# DEST_DIR = hdfs:/tmp/slurper/dest

SCRIPT = /usr/local/hdfs-slurper/bin/sample-python.py

If you run the Slurper again, you’ll notice that the destination path is now partitioned by date by the Python script:

$ touch /tmp/slurper/in/apache-20110202.log

$ bin/slurper.sh  --config-file conf/slurper.conf

Launching script '/usr/local/hdfs-slurper/bin/sample-python.py' and

piping the following to stdin 'file:/tmp/slurper/work/apache-20110202.log'


Moving staging file 'hdfs:/tmp/slurper/stage/675861557' to destination


Compression and verification

What if you want to compress the output file in HDFS and also verify that the copy is correct? You’ll need to use the COMPRESSION_CODEC option, whose value is a class that implements the CompressionCodec interface. If your compression codec is LZO or LZOP, you can also add aCREATE_LZO_INDEX option so that LZOP indexes are created. If you don’t know what this means, take a look at the LZO coverage in chapter 4.

Also available is a verification feature, which rereads the destination file after the copy has completed and ensures that the checksum of the destination file matches the source file. This results in longer processing times, but it adds an additional level of assurance that the copy was successful.

The following configuration fragment shows the LZOP codec, LZO indexing, and file verification enabled:

COMPRESSION_CODEC = com.hadoop.compression.lzo.LzopCodec


VERIFY = true

Let’s run the Slurper again:

$ touch /tmp/slurper/in/apache-20110202.log

$ bin/slurper.sh  --config-file conf/slurper.conf

Verifying files

CRC's match (0)

Moving staging file 'hdfs:/tmp/slurper/stage/535232571'

to destination 'hdfs:/data/2011/02/02/apache-20110202.log.snappy'

Continuous operation

Now that you have the basic mechanics in place, your final step is to run the tool as a daemon so that it continuously looks for files to transfer. To do this, you can use a script called bin/slurper-inittab.sh, which is designed to work with the inittab respawn.[15]

15 Inittab is a Linux process-management tool that you can configure to supervise and restart a process if it goes down. See INITTAB(5) in the Linux System Administrator’s Manual: http://unixhelp.ed.ac.uk/CGI/mancgi?inittab+5.

This script won’t create a PID file or perform a nohup—neither makes sense in the context of respawn, because inittab is managing the process. It uses the DATASOURCE_NAME configuration value to create the log filename. This means that multiple Slurper instances can all be launched with different config files logging to separate log files.


The Slurper is a handy tool for data ingress from a local filesystem to HDFS. It also supports data egress by copying from HDFS to the local filesystem. It can be useful in situations where MapReduce doesn’t have access to the filesystem and the files being transferred are in a form that doesn’t work with tools such as Flume.

Now let’s look at automated pulls for situations where MapReduce or HDFS has access to your data sources.

Technique 41 Scheduling regular ingress activities with Oozie

If your data is sitting on a filesystem, web server, or any other system accessible from your Hadoop cluster, you’ll need a way to periodically pull that data into Hadoop. Tools exist to help with pushing log files and pulling from databases (which we’ll cover in this chapter), but if you need to interface with some other system, it’s likely you’ll need to handle the data ingress process yourself.


Oozie versions

This technique covers using Oozie version 4.0.0.


There are two parts to this data ingress process: how you import data from another system into Hadoop, and how you regularly schedule the data transfer.


You want to automate a daily task to download content from an HTTP server into HDFS.


Oozie can be used to move data into HDFS, and it can also be used to execute post-ingress activities such as launching a MapReduce job to process the ingested data. Now an Apache project, Oozie started life inside Yahoo1!. It’s a Hadoop workflow engine that manages data processing activities. Oozie also has a coordinator engine that can start workflows based on data and time triggers.


In this technique, you’ll perform a download from a number of URLs every 24 hours, using Oozie to manage the workflow and scheduling. The flow for this technique is shown in figure 5.7. You’ll use Oozie’s triggering capabilities to kick off a MapReduce job every 24 hours. The appendix contains Oozie installation instructions.

Figure 5.7. Data flow for this Oozie technique

The first step is to look at the coordinator XML configuration file. This file is used by Oozie’s coordination engine to determine when it should kick off a workflow. Oozie uses a template engine and expression language to perform parameterization, as you’ll see in the following code. Create a file called coordinator.xml with the following content:[16]

16 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/oozie/http-download/coordinator.xml.

Listing 5.1. Using a template engine to perform parameterization with Oozie

What can be confusing about Oozie’s coordinator is that the start and end times don’t relate to the actual times when the jobs will be executed. Rather, they refer to the dates that will be created (“materialized”) for each workflow execution. This is useful in situations where you have data being generated at periodic intervals and you want to be able to go back in time to a certain point and perform some work on that data. In this example, you don’t want to go back in time, but instead want to schedule a job every 24 hours going forward. But you won’t want to wait until the next day, so you can set the start date to be yesterday, and the end date to be some far-off date in the future.

Next you need to define the actual workflow, which will be executed for every interval in the past, and, going forward, when the wall clock reaches an interval. To do this, create a file called workflow.xml with the content shown in the next listing.[17]

17 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/oozie/http-download/workflow.xml.

Listing 5.2. Defining the past workflow using Oozie’s coordinator


Working with the new MapReduce APIs in Oozie

By default, Oozie expects that your map and reduce classes use the “old” MapReduce APIs. If you want to use the “new” APIs, you need to specify additional properties:


















The last step is to define your properties file, which specifies how to get to HDFS, MapReduce, and the location of the two XML files previously identified in HDFS. Create a file called job.properties, as shown in the following code:


JobTracker property for different Hadoop versions

If you’re targeting Hadoop 1, you should use the JobTracker RPC port in the jobTracker property (the default is 8021). Otherwise use the YARN ResourceManager RPC port (the default is 8032).


In the previous snippet, the location in HDFS indicates where the coordinator.xml and workflow.xml files that you wrote earlier in this chapter are. Now you need to copy the XML files, your input file, and the JAR file containing your MapReduce code into HDFS:

$ hadoop fs -put oozie/http-download http-download

$ hadoop fs -put test-data/ch5/http-download/input/* http-download/

$ hadoop fs -mkdir http-download/lib

$ hadoop fs -put hip-2.0.0.jar http-download/lib/

Finally, run your job in Oozie:

$ oozie job -config src/main/oozie/http-download/job.properties \


job: 0000006-140124164013396-oozie-ahol-C

You can use the job ID to get some information about the job:

$ oozie job -info 0000006-140124164013396-oozie-ahol-C

Job ID : 0000006-140124164013396-oozie-ahol-C


Job Name    : http-download

App Path    : hdfs://localhost:8020/user/aholmes/http-download

Status      : RUNNING

Start Time  : 2014-01-23 00:00 GMT

End Time    : 2026-11-29 00:00 GMT

Pause Time  : -

Concurrency : 1


ID                 Status    Created              Nominal Time

0000006-1401241... SUCCEEDED 2014-02-16 20:50 GMT 2014-01-23 00:00 GMT


This output tells you that the job resulted in one run, and you can see the nominal time for the run. The overall state is RUNNING, which means that the job is waiting for the next interval to occur. When the overall job has completed (after the end date has been reached), the status will transition to SUCCEEDED.

You can confirm that there is an output directory in HDFS corresponding to the materialized date:

$ hadoop fs -lsr http-download/output


As long as the job is running, it’ll continue to execute until the end date, which in this example has been set as the year 2026. If you wish to stop the job, use the -suspend option:

$ oozie job -suspend 0000006-140124164013396-oozie-ahol-C

Oozie also has the ability to resume suspended jobs, as well as to kill a workflow, using the -resume and -kill options, respectively.


I showed you one example of the use of the Oozie coordinator, which offers cron-like capabilities to launch periodic Oozie workflows. The Oozie coordinator can also be used to trigger a workflow based on data availability (if no data is available, the workflow isn’t triggered). For example, if you had an external process, or even MapReduce generating data on a regular basis, you could use Oozie’s data-driven coordinator to trigger a workflow, which could aggregate or process that data.

In this section, we covered three automated mechanisms that can be used for data ingress purposes. The first technique covered Flume, a powerful tool for shipping your log data into Hadoop, and the second technique looked at the HDFS File Slurper, which automates the process of pushing data into HDFS. The final technique looked at how Oozie could be used to periodically launch a MapReduce job to pull data into HDFS or MapReduce.

At this point in our exploration of data ingress, we’ve looked at pushing log files, pushing files from regular filesystems, and pulling files from web servers. Another data source that will be of interest to most organizations is relational data sitting in OLTP databases. Next up is a look at how you can access that data.

5.2.3. Databases

Most organizations’ crucial data exists across a number of OLTP databases. The data stored in these databases contains information about users, products, and a host of other useful items. If you wanted to analyze this data, the traditional way to do so would be to periodically copy that data into an OLAP data warehouse.

Hadoop has emerged to play two roles in this space: as a replacement to data warehouses, and as a bridge between structured and unstructured data and data warehouses. Figure 5.8 shows the first role, where Hadoop is used as a large-scale joining and aggregation mechanism prior to exporting the data to an OLAP system (a commonly used platform for business intelligence applications).

Figure 5.8. Using Hadoop for data ingress, joining, and egress to OLAP

Facebook is an example of an organization that has successfully utilized Hadoop and Hive as an OLAP platform for working with petabytes of data. Figure 5.9 shows an architecture similar to that of Facebook’s. This architecture also includes a feedback loop into the OLTP system, which can be used to push discoveries made in Hadoop, such as recommendations for users.

Figure 5.9. Using Hadoop for OLAP and feedback to OLTP systems

In either usage model, you need a way to bring relational data into Hadoop, and you also need to export it into relational databases. In this section, you’ll use Sqoop to streamline moving your relational data into Hadoop.

Technique 42 Using Sqoop to import data from MySQL

Sqoop is a project that you can use to move relational data into and out of Hadoop. It’s a great high-level tool as it encapsulates the logic related to the movement of the relational data into Hadoop—all you need to do is supply Sqoop the SQL queries that will be used to determine which data is exported. This technique provides the details on how you can use Sqoop to move some stock data in MySQL to HDFS.



This section uses version 1.4.4 of Sqoop. The code and scripts used in this technique may not work with other versions of Sqoop, especially Sqoop 2, which is implemented as a web application.



You want to load relational data into your cluster and ensure your writes are efficient and also idempotent.


In this technique, we’ll look at how you can use Sqoop as a simple mechanism to bring relational data into Hadoop clusters. We’ll walk through the process of importing data from MySQL into Sqoop. We’ll also cover bulk imports using the fast connector (connectors are database-specific components that provide database read and write access).


Sqoop is a relational database import and export system. It was created by Cloudera and is currently an Apache project in incubation status.

When you perform an import, Sqoop can write to HDFS, Hive, and HBase, and for exports it can do the reverse. Importing is divided into two activities: connecting to the data source to gather some statistics, and then firing off a MapReduce job that performs the actual import. Figure 5.10shows these steps.

Figure 5.10. Sqoop import overview: connecting to the data source and using MapReduce

Sqoop has the notion of connectors, which contain the specialized logic needed to read and write to external systems. Sqoop comes with two classes of connectors: common connectors for regular reads and writes, and fast connectors that use database-proprietary batch mechanisms for efficient imports. Figure 5.11 shows these two classes of connectors and the databases that they support.

Figure 5.11. Sqoop connectors used to read and write to external systems

Before you can continue, you’ll need access to a MySQL database and the MySQL JDBC JAR will need to be available.[18] The following script will create the necessary MySQL user and schema and load the data for this technique. The script creates a hip_sqoop_user MySQL user, and creates a sqoop_test database with three tables: stocks, stocks_export, and stocks_staging. It then loads the stocks sample data into the stocks table. All of these steps are performed by running the following command:

18 MySQL installation instructions can be found in the appendix, if you don’t already have it installed. That section also includes a link to get the JDBC JAR.

$ bin/prep-sqoop-mysql

Here’s a quick peek at what the script does:

$ mysql -u hip_sqoop_user -p

<enter "password" for the password>

mysql> use sqoop_test;

mysql> show tables;


| Tables_in_sqoop_test |


| stocks               |

| stocks_export        |

| stocks_staging       |


3 rows in set (0.00 sec)

mysql> select * from stocks;


| id | symbol | quote_date | open_price | high_price | low_price |...


|  1 | AAPL   | 2009-01-02 |      85.88 |      91.04 |     85.16 |...

|  2 | AAPL   | 2008-01-02 |     199.27 |     200.26 |    192.55 |...

|  3 | AAPL   | 2007-01-03 |      86.29 |      86.58 |      81.9 |...


Follow the instructions in the appendix to install Sqoop. Those instructions also contain important steps for installing Sqoop dependencies, such as MySQL JDBC drivers.

Your first Sqoop command will be a basic import, where you’ll specify connection information for your MySQL database and the table you want to export:

$ sqoop import \

    --username hip_sqoop_user \

    --password password \

    --connect jdbc:mysql://localhost/sqoop_test \

    --table stocks


MySQL table names

MySQL table names in Linux are case-sensitive. Make sure that the table name you supply in the Sqoop commands uses the correct case.


By default, Sqoop uses the table name as the destination in HDFS for the MapReduce job that it launches to perform the import. If you run the same command again, the MapReduce job will fail because the directory already exists.

Let’s take a look at the stocks directory in HDFS:

$ hadoop fs -ls stocks

624 2011-11-24 11:07 /user/aholmes/stocks/part-m-00000

644 2011-11-24 11:07 /user/aholmes/stocks/part-m-00001

642 2011-11-24 11:07 /user/aholmes/stocks/part-m-00002

686 2011-11-24 11:07 /user/aholmes/stocks/part-m-00003

$ hadoop fs -cat stocks/part-m-00000





Import data formats

Sqoop has imported the data as comma-separated text files. It supports a number of other file formats, which can be activated with the arguments listed in table 5.6.

Table 5.6. Sqoop arguments that control the file formats of import commands




Data is imported as Avro files.


Data is imported as SequenceFiles.


The default file format; data is imported as CSV text files.

If you’re importing large amounts of data, you may want to use a file format such as Avro, which is a compact data format, and use it in conjunction with compression. The following example uses the Snappy compression codec in conjunction with Avro files. It also writes the output to a different directory from the table name by using the --target-dir option and specifies that a subset of rows should be imported by using the --where option. Specific columns to be extracted can be specified with --columns:

$ sqoop import \

    --username hip_sqoop_user \

    --password password \

    --as-avrodatafile \

    --compress \

    --compression-codec org.apache.hadoop.io.compress.SnappyCodec \

    --connect jdbc:mysql://localhost/sqoop_test \

    --table stocks \

    --where "symbol = 'AAPL'" \

    --columns "symbol,quote_date,close_price" \

    --target-dir mystocks

Note that the compression that’s supplied on the command line must be defined in the config file, core-site.xml, under the io.compression.codecs property. The Snappy compression codec requires you to have the Hadoop native libraries installed. See chapter 4 for more details on compression setup and configuration.

You can introspect the structure of the Avro file to see how Sqoop has laid out the records by using the AvroDump tool introduced in technique 12. Sqoop uses Avro’s GenericRecord for record-level storage (more details on that in chapter 3). If you run AvroDump against the Sqoop-generated files in HDFS, you’ll see the following:

$ hip hip.util.AvroDump --file mystocks/part-m-00000.avro

{"symbol": "AAPL", "quote_date": "2009-01-02", "close_price": 90.75}

{"symbol": "AAPL", "quote_date": "2008-01-02", "close_price": 194.84}

{"symbol": "AAPL", "quote_date": "2007-01-03", "close_price": 83.8}


Using Sqoop in conjunction with SequenceFiles

One of the things that makes SequenceFiles hard to work with is that there isn’t a generic way to access data in a SequenceFile. You must have access to the Writable class that was used to write the data. In Sqoop’s case, it code-generates this file, which introduces a major problem: if you move to a newer version of Sqoop, and that version modifies the code generator, there’s a chance your older code-generated class won’t work with SequenceFiles generated with the newer version of Sqoop. You’ll either need to migrate all of your old SequenceFiles to the new version, or have code that can work with different versions of these SequenceFiles. Due to this restriction, I don’t recommend using SequenceFiles with Sqoop. If you’re looking for more information on how SequenceFiles work, run the Sqoop import tool and look at the stocks.java file that’s generated within your working directory.


You can take things a step further and specify the entire query with the --query option as follows:

Securing passwords

Up until now you’ve been using passwords in the clear on the command line. This is a security hole, because other users on the host can easily list the running processes and see your password. Luckily Sqoop has a few mechanisms that you can use to avoid leaking your password.

The first approach is to use the -P option, which will result in Sqoop prompting you for the password. This is the most secure approach, as it doesn’t require you to store your password, but it means you can’t automate your Sqoop commands.

The second approach is to use the --password-file option, where you specify a file that contains your password. Note that this file must exist in the configured filesystem (mostly likely HDFS), not on a disk local to the Sqoop client. You’ll probably want to lock the file down so that only you have read access to this file. This still isn’t the most secure option, as root users on the filesystem would still be able to pry into the file, and unless you’re running secure Hadoop, it’s fairly easy even for non-root users to gain access.

The last option is to use an options file. Create a file called ~/.sqoop-import-opts:






Don’t forget to lock down the file to avoid prying eyes:

$ chmod 600 ~/.sqoop-import

Then you can supply this filename to your Sqoop job via the --options-file option, and Sqoop will read the options specified in the file, which means you don’t need to supply them on the command line:

$ sqoop \

    --options-file ~/.sqoop-import-opts \

    --connect jdbc:mysql://localhost/sqoop_test \

    --table stocks

Data splitting

How is Sqoop able to parallelize imports across multiple mappers?[19] In figure 5.10 I showed how Sqoop’s first step is to pull metadata from the database. It inspects the table being imported to determine the primary key, and runs a query to determine the lower and upper bounds of the data in the table (see figure 5.12). A somewhat even distribution of data within the minimum and maximum keys is assumed by Sqoop as it divides the delta (the range between the minimum and maximum keys) by the number of mappers. Each mapper is then fed a unique query containing a range of the primary key.

19 By default Sqoop runs with four mappers. The number of mappers can be controlled with the --num-mappers argument.

Figure 5.12. Sqoop preprocessing steps to determine query splits

You can configure Sqoop to use a nonprimary key with the --split-by argument. This can be useful in situations where the primary key doesn’t have an even distribution of values between the minimum and maximum values. For large tables, however, you need to be careful that the column specified in --split-by is indexed to ensure optimal import times.

You can use the --boundary-query argument to construct an alternative query to determine the minimum and maximum values.

Incremental imports

You can also perform incremental imports. Sqoop supports two types: append works for numerical data that’s incrementing over time, such as auto-increment keys; lastmodified works on timestamped data. In both cases you need to specify the column using --check-column, the mode via the --incremental argument (the value must be either append or lastmodified), and the actual value to use to determine the incremental changes via --last-value.

For example, if you want to import stock data that’s newer than January 1, 2005, you’d do the following:

$ sqoop  import \

    --username hip_sqoop_user \

    --password password \

    --check-column "quote_date" \

    --incremental "lastmodified" \

    --last-value "2005-01-01" \

    --connect jdbc:mysql://localhost/sqoop_test \

    --table stocks


tool.ImportTool:  --incremental lastmodified

tool.ImportTool:   --check-column quote_date

tool.ImportTool:   --last-value 2014-02-17 07:58:39.0

tool.ImportTool: (Consider saving this with 'sqoop job --create')


Assuming that there’s another system that’s continuing to write into the stocks table, you’d use the --last-value output of this job as the input to the subsequent Sqoop job so that only rows newer than that date will be imported.

Sqoop jobs and the metastore

You can see in the command output the last value that was encountered for the increment column. How can you best automate a process that can reuse that value? Sqoop has the notion of a job, which can save this information and reuse it in subsequent executions:

Executing the preceding command creates a named job in the Sqoop metastore, which keeps track of all jobs. By default, the metastore is contained in your home directory under .sqoop and is only used for your own jobs. If you want to share jobs between users and teams, you’ll need to install a JDBC-compliant database for Sqoop’s metastore and use the --meta-connect argument to specify its location when issuing job commands.

The job create command executed in the previous example didn’t do anything other than add the job to the metastore. To run the job, you need to explicitly execute it as shown here:

The metadata displayed by the --show argument includes the last value of your incremental column. This is actually the time when the command was executed, and not the last value in the table. If you’re using this feature, make sure that the database server and any clients interacting with the server (including the Sqoop client) have their clocks synced with the Network Time Protocol (NTP).

Sqoop will prompt for a password when running the job. To make this work in an automated script, you’ll need to use Expect, a Linux automation tool, to supply the password from a local file when it detects Sqoop prompting for a password. An Expect script that works with Sqoop can be found on GitHub at https://github.com/alexholmes/hadoop-book/blob/master/bin/sqoop-job.exp.

Sqoop jobs can also be deleted as shown here:

$ sqoop job --delete stock_increment

Fast MySQL imports

What if you want to bypass JDBC altogether and use the fast MySQL Sqoop connector for a high-throughput load into HDFS? This approach uses the mysqldump utility shipped with MySQL to perform the load. You must make sure that mysqldump is in the path of the user running the MapReduce job. To enable use of the fast connector you must specify the --direct argument:

$ sqoop --options-file ~/.sqoop-import-opts \

    --direct \

    --connect jdbc:mysql://localhost/sqoop_test \

    --table stocks

What are the disadvantages of fast connectors? Fast connectors only work with text output files—specifying Avro or SequenceFile as the output format of the import won’t work.

Importing to Hive

The final step in this technique is to use Sqoop to import your data into a Hive table. The only difference between an HDFS import and a Hive import is that the Hive import has a postprocessing step where the Hive table is created and loaded, as shown in figure 5.13.

Figure 5.13. The Sqoop Hive import sequence of events

When data is loaded into Hive from an HDFS file or directory, as in the case of Sqoop Hive imports (step 4 in the figure), Hive moves the directory into its warehouse rather than copying the data (step 5) for the sake of efficiency. The HDFS directory that the Sqoop MapReduce job writes to won’t exist after the import.

Hive imports are triggered via the --hive-import argument. Just like with the fast connector, this option isn’t compatible with the --as-avrodatafile[20] and --as -sequencefile options:

20 See https://issues.apache.org/jira/browse/SQOOP-324 for a potential future fix.

$ sqoop --options-file ~/.sqoop-import-opts \

    --hive-import \

    --connect jdbc:mysql://localhost/sqoop_test \

    --table stocks

$ hive

hive> select * from stocks;


1 AAPL  2009-01-02  85.88  91.04   85.16   90.75  26643400   90.75

2 AAPL  2008-01-02 199.27 200.26  192.55  194.84  38542100  194.84

3 AAPL  2007-01-03  86.29  86.58   81.9    83.8   44225700   83.8

4 AAPL  2006-01-03  72.38  74.75   72.25   74.75  28829800   74.75



Importing strings containing Hive delimiters

You’ll likely have downstream processing issues if you’re importing columns that can contain any of Hive’s delimiters (the \n, \r, and \01 characters). You have two options in such cases: either specify --hive-drop-import-delims, which will remove conflicting characters as part of the import, or specify --hive-delims-replacement, which will replace them with a different character.


If the Hive table already exists, the data will be appended to the existing table. If this isn’t the desired behavior, you can use the --hive-overwrite argument to indicate that the existing table should be replaced with the imported data.

You can also tell Sqoop to compress data being written to Hive tables. Sqoop currently only supports text outputs for Hive, so the LZOP compression codec is the best option here as it can be split in Hadoop (see chapter 4 for details).[21] The following example shows how to use --hive-overwrite in conjunction with LZOP compression. For this to work, you’ll need to have LZOP built and installed on your cluster, because it isn’t bundled with Hadoop (or CDH) by default. Refer to chapter 4 for more details:

21 bzip2 is also a splittable compression codec that can be used in Hadoop, but its write performance is so poor that in practice it’s rarely used.

$ hive

hive> drop table stocks;

$ hadoop fs -rmr stocks

$ sqoop --options-file ~/.sqoop-import-opts \

    --hive-import \

    --hive-overwrite \

    --compress \

    --compression-codec com.hadoop.compression.lzo.LzopCodec \

    --connect jdbc:mysql://localhost/sqoop_test \

    --table stocks

Finally, you can use the --hive-partition-key and --hive-partition-value arguments to create different Hive partitions based on the value of a column being imported. For example, if you want to partition your input by stock name, you do the following:

$ hive

hive> drop table stocks;

$ hadoop fs -rmr stocks

$ read -d '' query << "EOF"

SELECT id, quote_date, open_price

FROM stocks



$ sqoop --options-file ~/.sqoop_import_options.txt \

    --query "$query" \

    --split-by id \

    --hive-import \

    --hive-table stocks \

    --hive-overwrite \

    --hive-partition-key symbol \

    --hive-partition-value "AAPL" \

    --connect jdbc:mysql://localhost/sqoop_test \

    --target-dir stocks

$ hadoop fs -lsr /user/hive/warehouse




Now, the previous example isn’t optimal by any means. Ideally, a single import would be able to create multiple Hive partitions. Because you’re limited to specifying a single key and value, you’d need to run the import once per unique partition value, which is laborious. You’d be better off importing into a nonpartitioned Hive table, and then retroactively creating partitions on the table after it had been loaded.

Also, the SQL query that you supply to Sqoop must also take care of filtering out the results, so that only those that match the partition are included. In other words, it would have been useful if Sqoop had updated the WHERE clause with symbol = "AAPL" rather than you having to do this yourself.

Continuous Sqoop execution

If you need to regularly schedule imports into HDFS, Oozie has Sqoop integration that will allow you to periodically perform imports and exports. A sample Oozie workflow.xml example follows:

<workflow-app xmlns="uri:oozie:workflow:0.2" name="sqoop-wf">

  <start to="sqoop-node"/>

  <action name="sqoop-node">

    <sqoop xmlns="uri:oozie:sqoop-action:0.2">




        <delete path="${nameNode}/output-data/sqoop"/>

        <mkdir path="${nameNode}/output-data/sqoop"/>



       --username hip_sqoop_user

       --password password

       --connect jdbc:mysql://localhost/sqoop_test

       --table stocks --target-dir  ${nameNode}/output-data/sqoop

       -m 1



    <ok to="end"/>

    <error to="fail"/>


  <kill name="fail">

    <message>Sqoop failed, error message



  <end name="end"/>


Single and double quotes aren’t supported within the <command> element, so if you need to specify arguments that contain spaces, you’ll need to use the <arg> element instead:






One other consideration when using Sqoop from Oozie is that you’ll need to make the JDBC driver JAR available to Oozie. You can either copy the JAR into the workflow’s lib/ directory or update your Hadoop installation’s lib directory with the JAR.


Obviously, for Sqoop to work, your Hadoop cluster nodes need to have access to the MySQL database. Common sources of error are either misconfiguration or lack of connectivity from the Hadoop nodes. It’s probably wise to log on to one of the Hadoop nodes and attempt to connect to the MySQL server using the MySQL client, or attempt access with the mysqldump utility (if you’re using a fast connector).

Another important point when using a fast connector is that it’s assumed that mysqldump is installed on each Hadoop node and is in the path of the user running the map tasks.

This wraps up our review of using Sqoop to import data from relational databases into Hadoop. We’ll now transition from relational stores to a NoSQL store, HBase, which excels at data interoperability with Hadoop because it uses HDFS to store its data.

5.2.4. HBase

Our final foray into moving data into Hadoop involves taking a look at HBase. HBase is a real-time, distributed, data storage system that’s often either colocated on the same hardware that serves as your Hadoop cluster or is in close proximity to a Hadoop cluster. Being able to work with HBase data directly in MapReduce, or to push it into HDFS, is one of the huge advantages when picking HBase as a solution.

In the first technique, I’ll show you how to use a tool that HBase is bundled with to save an HBase table into HDFS.

Technique 43 HBase ingress into HDFS

What if you had customer data sitting in HBase that you wanted to use in MapReduce in conjunction with data in HDFS? You could write a MapReduce job that takes as input the HDFS dataset and pulls data directly from HBase in your map or reduce code. But in some cases it may be more useful to take a dump of the data in HBase directly into HDFS, especially if you plan to utilize that data in multiple Map-Reduce jobs and the HBase data is immutable or changes infrequently.


You want to get HBase data into HDFS.


HBase includes an Export class that can be used to import HBase data into HDFS in SequenceFile format. This technique also walks through code that can be used to read the imported HBase data.


Before we get started with this technique, you need to get HBase up and running.[22]

22 The appendix contains installation instructions and additional resources for working with HBase.

To be able to export data from HBase you first need to load some data into HBase. The loader creates an HBase table called stocks_example with a single column family, details. You’ll store the HBase data as Avro binary-serialized data. I won’t show the code here, but it’s available on GitHub.[23]

23 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch5/hbase/HBaseWriter.java.

Run the loader and use it to load the sample stock data into HBase:

$ hip hip.ch5.hbase.HBaseWriter \

    --input test-data/stocks.txt

You can use the HBase shell to look at the results of the load. The list command, without any arguments, will show you all of the tables in HBase, and the scan command, with a single argument, will dump all of the contents of a table:

$ hbase shell

hbase(main):012:0> list



1 row(s) in 0.0100 seconds

hbase(main):007:0> scan 'stocks_example'

ROW              COLUMN+CELL

AAPL2000-01-03   column=details:stockAvro, timestamp=1322315975123,...

AAPL2001-01-02   column=details:stockAvro, timestamp=1322315975123,...


With your data in place, you’re ready to export it to HDFS. HBase comes with an org.apache.hadoop.hbase.mapreduce.Export class that will dump an HBase table. An example of using the Export class is shown in the following snippet. With this command, you can export the whole HBase table:

The Export class also supports exporting only a single column family, and it can also compress the output:

The Export class writes the HBase output in the SequenceFile format, where the HBase row key is stored in the SequenceFile record key using org.apache.hadoop.hbase.io.ImmutableBytesWritable, and the HBase value is stored in the SequenceFile record value usingorg.apache.hadoop.hbase.client.Result.

What if you want to process that exported data in HDFS? The following listing shows an example of how you’d read the HBase SequenceFile and extract the Avro stock records.[24]

24 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch5/hbase/ExportedReader.java.

Listing 5.3. Reading the HBase SequenceFile to extract Avro stock records

You can run the code against the HDFS directory that you used for the export and view the results:

$ hip hip.ch5.hbase.ExportedReader \

    --input output/part-m-00000

AAPL2000-01-03: AAPL,2000-01-03,104.87,...

AAPL2001-01-02: AAPL,2001-01-02,14.88,...

AAPL2002-01-02: AAPL,2002-01-02,22.05,...


The HBaseExportedStockReader class is able to read and dump out the contents of the SequenceFile used by HBase’s Export class.

Exporting data from HBase into HDFS is made easier with the built-in HBase Export class. But what if you don’t want to write HBase data into HDFS, but instead want to process it directly in a MapReduce job? Let’s look at how you can use HBase as a data source for a MapReduce job.

Technique 44 MapReduce with HBase as a data source

The built-in HBase exporter writes out HBase data using SequenceFile, which isn’t supported by programming languages other than Java and doesn’t support schema evolution. It also only supports a Hadoop filesystem as the data sink. If you want to have more control over HBase data extracts, you may have to look beyond the built-in HBase facilities.


You want to operate on HBase directly within your MapReduce jobs without the intermediary step of copying the data into HDFS.


HBase has a TableInputFormat class that can be used in your MapReduce job to pull data directly from HBase.


HBase provides an InputFormat class called TableInputFormat, which can use HBase as a data source in MapReduce. The following listing shows a MapReduce job that uses this input format (via the TableMapReduceUtil.initTableMapperJob call) to read data from HBase.[25]

25 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch5/hbase/ImportMapReduce.java.

Listing 5.4. Importing HBase data into HDFS using MapReduce

You can run this MapReduce job as follows:

$ hip hip.ch5.hbase.ImportMapReduce --output output

A quick peek in HDFS should tell you whether or not your MapReduce job worked as expected:

$ hadoop fs -cat output/part*

AAPL    111.94

AAPL    14.88

AAPL    23.3

This output confirms that the MapReduce job works as expected.


The TableInputFormat class examines HBase and creates an input split for each HBase table region. If there are 10 HBase regions, 10 map tasks will execute. The input format also includes the server that hosts the region in the input split, which means that the map tasks will be scheduled to execute on the same nodes as the HRegionServer hosting the data. This gives you locality at the HBase level, but also at the HDFS level. Data being read from the region will likely be coming from local disk, because after some time, all of a region’s data will be local to it. This all assumes that the HRegion-Servers are running on the same hosts as the DataNodes.

Our focus over the last couple of sections has been on persistent stores, covering relational databases and HBase, a NoSQL store. We’re now going to change directions and look at how a publish-subscribe system can be leveraged to move data into Hadoop.

5.2.5. Importing data from Kafka

Kafka, a distributed publish-subscribe system, is quickly becoming a key part of our data pipelines thanks to its strong distributed and performance properties. It can be used for many functions, such as messaging, metrics collection, stream processing, and log aggregation. Another effective use of Kafka is as a vehicle to move data into Hadoop. This is useful in situations where you have data being produced in real time that you want to land in Hadoop.

A key reason to use Kafka is that it decouples data producers and consumers. It notably allows you to have multiple independent producers (possibly written by different development teams), and, likewise, multiple independent consumers (again possibly written by different teams). Also, consumption can be real-time/synchronous or batch/offline/asynchronous. The latter property is a big differentiator when you’re looking at other pub-sub tools like RabbitMQ.

Kafka has a handful of concepts that you’ll need to understand:

·        Topics —A topic is a feed of related messages.

·        Partitions —Each topic is made up of one or more partitions, which are ordered sequences of messages backed by log files.[26]

26 I’m not talking about logging files here; Kafka employs log files to store data flowing through Kafka.

·        Producers and consumers —Producers and consumers write messages to and read them from partitions.

·        Brokers —Brokers are the Kafka processes that manage topics and partitions and serve producer and consumer requests.

Kafka does not guarantee “total” ordering for a topic—instead, it only guarantees that the individual partitions that make up a topic are ordered. It’s up to the consumer application to enforce, if needed, a “global” per-topic ordering.

Figure 5.14 shows a conceptual model of how Kafka works and figure 5.15 shows an example of how partitions could be distributed in an actual Kafka deployment.

Figure 5.14. Conceptual Kafka model showing producers, topics, partitions, and consumers

Figure 5.15. A physical Kafka model showing how partitions can be distributed across brokers

To support fault tolerance, topics can be replicated, which means that each partition can have a configurable number of replicas on different hosts. This provides increased fault tolerance and means that a single server dying isn’t catastrophic for your data or for the availability of your producers and consumers.



Technique 45 employs Kafka version 0.8 and the 0.8 branch of Camus.


This wraps up our quick dive into how Kafka works. For more details, please refer to Kafka’s online documentation.

Technique 45 Using Camus to copy Avro data from Kafka into HDFS

This technique is useful in situations where you already have data flowing in Kafka for other purposes and you want to land that data in HDFS.


You want to use Kafka as a data-delivery mechanism to get your data into HDFS.


Use Camus, a LinkedIn-developed solution for copying data in Kafka into HDFS.


Camus is an open-source project developed by LinkedIn. Kafka is heavily deployed at LinkedIn, and where Camus is used as a tool to copy data from Kafka into HDFS.

Out of the box, Camus supports two data formats in Kafka: JSON and Avro. In this technique we’re going to get Camus working with Avro data. Camus’s built-in support of Avro requires that Kafka publishers write the Avro data in a proprietary way, so for this technique we’re going to assume that you want to work with vanilla Avro-serialized data in Kafka.

There are three parts to getting this technique to work: you’ll first write some Avro data into Kafka, then you’ll write a simple class to help Camus deserialize your Avro data, and finally you’ll run a Camus job to perform the data import.

Writing data into Kafka

To get going, you’ll write some Avro records into Kafka. In the following code, you set up a Kafka producer by configuring some required Kafka properties, load some Avro records from file, and write them out to Kafka:[27]

27 The complete set of Kafka properties you can set can be viewed in the Kafka documentation: https://kafka.apache.org/documentation.html.

You can load the sample stock data into a Kafka topic called test with the following command:

$ hip hip.ch5.kafka.KafkaAvroWriter \

     --stocksfile test-data/stocks.txt \

     --broker-list localhost:9092 \

     --topic test

The Kafka console consumer can be used to verify that the data has been written to Kafka. This will dump the binary Avro data to your console:

$ kafka-console-consumer.sh \

    --zookeeper localhost:2181 \

    --topic test \


Once that’s done, you’re ready for the next part—writing some Camus code so that you can read these Avro records in Camus.

Writing a Camus decoder and schema registry

There are three Camus concepts that you need to understand:

·        Decoders —The decoder’s job is to convert raw data pulled from Kafka into a Camus format.

·        Encoders —Encoders serialize decoded data into the format that will be stored in HDFS.

·        Schema registry —The schema registry provides schema information about Avro data being encoded.

As mentioned earlier, Camus supports Avro data, but it does so in a way that requires Kafka producers to write data using the Camus KafkaAvroMessageEncoder class, which prefixes the Avro-serialized binary data with some proprietary data, presumably so that the decoder in Camus can verify that it was written by that class.

In this example you’re serializing using the straight Avro serialization, so you need to write your own decoder. Luckily this is simple to do:



You may have noticed that we wrote a specific Avro record into Kafka, but in Camus we’re reading the record as a generic Avro record, not a specific Avro record. This is due to the fact that the CamusWrapper class only supports generic Avro records. Otherwise, specific Avro records would have been simpler to work with, as you can work with generated code and have all the type-safety goodness that comes along with that.


The CamusWrapper object is an envelope for the data being extracted from Kafka. The reason this class exists is that it allows you to stick metadata into the envelope, such as a timestamp, a server name, and the service details. It’s highly recommended that any data you work with have some meaningful timestamp associated with each record (typically this would be the time at which the record was created or generated). You can then use a CamusWrapper constructor that accepts the timestamp as an argument:

public CamusWrapper(R record, long timestamp) { ... }

If the timestamp isn’t set, then Camus will create a new timestamp at the time the wrapper is created. This timestamp and other metadata is used in Camus when determining the HDFS location of output records. You’ll see an example of this shortly.

Next you need to write a schema registry so that the Camus Avro encoder knows the schema details for the Avro records being written to HDFS. When registering the schema, you also specify the name of the Kafka topic from which the Avro record was pulled:

import com.linkedin.camus.schemaregistry.AvroMemorySchemaRegistry;

import hip.ch5.avro.gen.Stock;

public class StockSchemaRegistry extends AvroMemorySchemaRegistry {

  public StockSchemaRegistry() {


    // register the schema for the topic

    super.register("test", Stock.SCHEMA$);



That’s it for the coding side of things! Let’s move on and see Camus in action.

Running Camus

Camus runs as a MapReduce job on the Hadoop cluster where you want to import the Kafka data. You need to feed a bunch of properties to Camus, and you can do so using the command line, or alternatively using a properties file. We’ll use the properties file for this technique:

# comma-separated brokers in "host:port" format


# Name of the client as seen by kafka


# Top-level data output directory in HDFS


# HDFS location where you want to keep execution files,

# i.e. offsets, error logs, and count files


# Where completed Camus job output directories are kept,

# usually a sub-dir in the base.path


# The decoder class


# The HDFS serializer



# The schema registry



# Max hadoop tasks to use, each task can pull multiple topic partitions


As you can see from the properties, you don’t need to explicitly tell Camus which topics you want to import. Camus automatically communicates with Kafka to discover the topics (and partitions), and the current start and end offsets.

If you want control over exactly which topics are imported, you can whitelist (to limit the topics) or blacklist (to exclude topics) using kafka.whitelist.topics and kafka.blacklist.topics, respectively. Multiple topics can be specified using a comma as the delimiter. Regular expressions are also supported, as shown in the following example, which matches on topic “topic1” or any topics that start with “abc” followed by one or more digits. Blacklists can be specified using the exact same syntax for the value:


Once your properties are all set, you’re ready to run the Camus job:

$ CAMUS_HOME=<your Camus directory>

$ HIP_HOME=<your Hadoop in Practice directory>

$ LIBJARS="$CAMUS_HOME/camus-example/target/


$ LIBJARS=$LIBJARS=",$HIP_HOME/target/hip-2.0.0.jar"

$ export HADOOP_CLASSPATH=`echo ${LIBJARS} | sed s/,/:/g`

hadoop com.linkedin.camus.etl.kafka.CamusJob \

  -libjars ${LIBJARS} \

  -P $HIP_HOME/conf/camus-avro.conf

This will result in the Avro data landing in HDFS. Let’s take a look at what’s in HDFS:

$ hadoop fs -lsr /tmp/camus






The first file is the file that you’re interested in, as it contains the data that’s been imported. The other files are there for Camus’s housekeeping.

The data files in HDFS can be viewed using the AvroDump utility:

$ hip hip.util.AvroDump \

 --file /tmp/camus/dest/test/hourly/2014/03/03/01/test.

So what actually happened when the Camus job was running? The Camus import process is executed as a MapReduce job, as seen in figure 5.16.

Figure 5.16. A look at how a Camus job executes

As Camus tasks in MapReduce succeed, the Camus OutputCommitter (a MapReduce construct that allows for custom work to be performed upon task completion) atomically moves the tasks’ data files to the destination directory. The OutputCommitter additionally creates the offset files for all the partitions that the tasks were working on. It’s possible that other tasks in the same job may fail, but this doesn’t impact the state of tasks that succeed—the data and offset outputs of successful tasks will still exist, so that subsequent Camus executions will resume processing from the last-known successful state.

Next, let’s take a look at where Camus writes the imported data and how you can control the behavior.

Data partitioning

Earlier you saw the location where Camus imported the Avro data sitting in Kafka. Let’s take a closer look at the HDFS path structure, shown in figure 5.17, and see what you can do to determine the location.

Figure 5.17. Dissecting the Camus output path for exported data in HDFS

The date/time part of the path is determined by the timestamp extracted from the CamusWrapper. You’ll recall from our earlier discussion that you can extract timestamps from your records in Kafka in your MessageDecoder and supply them to the CamusWrapper, which will allow your data to be partitioned by dates that are meaningful to you, as opposed to the default, which is simply the time at which the Kafka record is read in MapReduce.

Camus supports a pluggable partitioner, which allows you to control the part of the path shown in figure 5.18.

Figure 5.18. The Camus partitioner path

The Camus Partitioner interface provides two methods that you must implement:

public interface Partitioner {


     * Encode partition values into a string, to be embedded

     * into the working filename.

     * Encoded values cannot use '/' or ':'.


    String encodePartition(JobContext context, IEtlKey etlKey);


     * Return a string representing the partitioned directory

     * structure where the .avro files will be moved.


     * For example, if you were using Hive style partitioning,

     * a timestamp based partitioning scheme would return

     *    topic-name/year=2012/month=02/day=04/hour=12



    String generatePartitionedPath(JobContext context, String topic,

        int brokerId, int partitionId, String encodedPartition);


As an example, a custom partitioner could create a path that could be leveraged for Hive partitions.


Camus provides a complete solution to landing data from Kafka in HDFS, and it takes care of maintaining state and error handling when things go wrong. It can be easily automated by integrating it with Azkaban or Oozie, and it performs some simple data-management facilities by organizing HDFS data based on the time that messages are ingested. It’s worth mentioning that when it comes to ETL, it’s bare-boned in its features compared to Flume.

Kafka comes bundled with a mechanism that pulls data into HDFS. It has a KafkaETLInputFormat input format class that can be used to pull data from Kafka in a MapReduce job. It requires you to write the MapReduce job to perform the import, but the advantage is that you can use the data directly in your MapReduce flow, as opposed to using HDFS as intermediary storage for your data.

The Flume project is also in the process of adding a Kafka source and sink, although at the time of writing that work is still in progress.[28] Once this is ready for production, you’ll be able to leverage all the other goodies that Flume offers, such as Morphlines and Solr indexing as part of moving Kafka data into Hadoop.

28 For more details on Flume and Kafka see https://issues.apache.org/jira/browse/FLUME-2242.

That concludes our examination of how to move data into Hadoop. We covered a broad range of data types, tools, and technologies. Next we’re going to flip things around and look at how to get data that resides in Hadoop out to other systems, such as filesystems and other stores.

5.3. Moving data out of Hadoop

Once you’ve used Hadoop to perform some critical function, be it data mining or data aggregations, the next step is typically to externalize that data into other systems in your environment. For example, it’s common to rely on Hadoop to perform offline aggregations on data that’s pulled from your real-time systems, and then to feed the derived data back into your real-time systems. A more concrete example would be building recommendations based on user-behavior patterns.

This section examines some of the more common scenarios where you want to get data out of Hadoop, and the tools that will help you with that work. We’ll start with a look at the lower-level tools that exist, most of which are built into Hadoop, and then go on to look at how to push data to relational databases and HBase.

To start off, we’ll look at how to copy files out of Hadoop using the command line.

5.3.1. Roll your own egress

This section covers some built-in mechanisms in Hadoop for copying data out of HDFS. These techniques can either be manually executed, or you’ll need to automate them using a scheduling system such as Azkaban, Oozie, or even cron.

Technique 46 Using the CLI to extract files

Imagine that you’ve run some jobs in Hadoop to aggregate some data, and now you want to get it out. One method you can use is the HDFS command-line interface (CLI) to pull out directories and files into your local filesystem. This technique covers some basic CLI commands that can help you out.


You want to copy files from HDFS to a local filesystem using the shell.


The HDFS CLI can be used for one-off moves, or the same commands can be incorporated into scripts for more regularly utilized moves.


Copying a file from HDFS to local disk is achieved via the hadoop command:

$ hadoop fs -get hdfs-file.txt local-file.txt

The behavior of the Hadoop put command differs from the Linux cp command—in Linux if the destination already exists, it’s overwritten; in Hadoop the copy fails with an error:

put: `hdfs-file.txt': File exists

The -f option must be added to force the file to be overwritten:

$ hadoop fs -get -f hdfs-file.txt local-file.txt

Much like with the Linux cp command, multiple files can be copied using the same command. In this case, the final argument must be the directory in the local filesystem into which the HDFS files are copied:

$ hadoop fs -get hdfs-file1.txt hdfs-file2.txt /local/dest/

Often, one is copying a large number of files from HDFS to local disk—an example is a MapReduce job output directory that contains a file for each task. If you’re using a file format that can be concatenated, you can use the -getmerge command to combine multiple files. By default, a newline is added at the end of each file during concatenation:

$ hdfs fs -getmerge hdfs-dir/part* /local/output.txt

There are many more operations supported by the fs command—to see the full list, run the command without any options.

The challenge with using the CLI is that it’s very low-level, and it won’t be able to assist you with your automation needs. Sure, you could use the CLI within shell scripts, but once you graduate to more sophisticated programming languages, forking a process for every HDFS command isn’t ideal. In this situation you may want to look at using the REST, Java, or C HDFS APIs. The next technique looks at the REST API.

Technique 47 Using REST to extract files

Using the CLI is handy for quickly running commands and for scripting, but it incurs the overhead of forking a separate process for each command, which is overhead that you’ll probably want to avoid, especially if you’re interfacing with HDFS in a programming language. This technique covers working with HDFS in languages other than Java.


You want to be able to interact with HDFS from a programming language that doesn’t have a native interface to HDFS.


Use Hadoop’s WebHDFS interface, which offers a full-featured REST API for HDFS operations.


Before you get started, you’ll need to enable WebHDFS on your cluster—see technique 34 for details on how to do that.

Let’s start by creating a file in HDFS using the CLI:

$ echo "the cat sat on the mat" | hadoop fs -put - /tmp/hdfs-file.txt

Reading the file from HDFS is a matter of specifying OPEN as the operation:

$ curl -L " cat sat on the mat


Consult technique 34 for additional information on using WebHDFS, including how it can be leveraged in different programming languages.

Technique 48 Reading from HDFS when behind a firewall

Production Hadoop environments are often locked down to protect the data residing in these clusters. Part of the security procedures could include putting your cluster behind a firewall, and this can be a nuisance if the destination for your Hadoop cluster is outside of the firewall. This technique looks at using the HttpFS gateway to provide HDFS access over port 80, which is often opened up on firewalls.


You want to pull data out of HDFS, but you’re sitting behind a firewall that’s restricting access to HDFS.


Use the HttpFS gateway, which is a standalone server that provides access to HDFS over HTTP. Because it’s a separate service and it’s HTTP, it can be configured to run on any host that has access to the Hadoop nodes, and you can open a firewall rule to allow traffic to the service.


HttpFS is useful because not only can you use REST to access HDFS, but it has a complete Hadoop filesystem implementation, which means you can use the CLI and native HDFS Java clients to talk to HDFS. Consult technique 35 for instructions on how to get HttpFS up and running.

Once it’s running, you can issue the same curl commands that you used in the previous technique with WebHDFS (the only difference is URL host and port, which need to point to where your HttpFS is deployed). This is one of the nice things about the HttpFS gateway—the syntax is exactly the same.

To dump the contents of the file /tmp/hdfs-file.txt, you’d do the following:

$ curl -L " cat sat on the mat


Swing on over to technique 35 for additional details on how HttpFS works.

Technique 49 Mounting Hadoop with NFS

Often it’s a lot easier to work with Hadoop data if it’s accessible as a regular mount to your filesystem. This allows you to use existing scripts, tools, and programming languages and easily interact with your data in HDFS. This section looks at how you can easily copy data out of HDFS using an NFS mount.


You want to treat HDFS as a regular Linux filesystem and use standard Linux tools to interact with HDFS.


Use Hadoop’s NFS implementation to access data in HDFS.


Technique 36 has setup instructions for NFS access to HDFS. Once that’s set up, you can perform normal filesystem operations such as copying files from HDFS to a local filesystem. The following example shows this, assuming that HDFS is mounted under /hdfs:

$ cp /hdfs/tmp/foo.txt ~/

For more details on how NFS works in Hadoop, head on over to technique 36.

Technique 50 Using DistCp to copy data out of Hadoop

Imagine that you have a large amount of data you want to move out of Hadoop. With most of the techniques in this section, you have a bottleneck because you’re funneling the data through a single host, which is the host on which you’re running the process. To optimize data movement as much as possible, you want to leverage MapReduce to copy data in parallel. This is where DistCp comes into play, and this technique examines one way you can pull out data to an NFS mount.


You want to efficiently pull data out of Hadoop and parallelize the copy.


Use DistCp.


Technique 37 covers DistCp in detail and includes details on how to copy data between different Hadoop clusters. But DistCp can’t be used to copy data from Hadoop to a local filesystem (or vice versa), because DistCp runs as a MapReduce job, and your cluster won’t have access to your local filesystem. Depending on your situation you have a couple of options:

·        Use the HDFS File Slurper to copy the local files.

·        Copy your files to an NFS that’s also available to all the DataNodes in your cluster.

If you go with the second option, you can use DistCp and write to a locally mounted NFS mount on each DataNode, an example of which follows:

$ hadoop distcp \

  hdfs://src \


Note that your NFS system may not handle a large number of parallel reads or writes, so you’ll likely want to run this with a smaller number of mappers than the default of 20—the following example runs with 5 mappers:

$ hadoop distcp \

  -m 5 \

  hdfs://src \


Technique 51 Using Java to extract files

Let’s say you’ve generated a number of Lucene indexes in HDFS, and you want to pull them out to an external host. Maybe you want to manipulate the files in some way using Java. This technique shows how the Java HDFS API can be used to read data in HDFS.


You want to copy files in HDFS to the local filesystem.


Use Hadoop’s filesystem API to copy data out of HDFS.


The HDFS Java API is nicely integrated with Java’s I/O model, which means you can work with regular input streams and output streams for I/O.

To start off, you need to create a file in HDFS using the command line:

$ echo "hello world" | hadoop fs -put - hdfs-file.txt

Now copy that file to the local filesystem using the command line:

$ hadoop fs -get hdfs-file.txt local-file.txt

Let’s explore how you can replicate this copy in Java. There are two main parts to writing the code to do this—the first part is getting a handle to the FileSystem and creating the file, and the second part is copying the data from standard input to the OutputStream:

You can see how this code works in practice by running the following command:

$ echo "the cat" | hadoop fs -put - hdfs-file.txt

$ hip hip.ch5.CopyHdfsFileToLocal \

    --input hdfs-file.txt \

    --output local-file.txt

$ cat local-file.txt

the cat

So far we’ve covered the low-level tools that are bundled with Hadoop to help you pull out data. Next we’ll look at a method for near-continuous movement of data from HDFS to a local filesystem.

5.3.2. Automated file egress

Up until now you’ve seen different options for copying data out of HDFS. Most of these mechanisms don’t have automation or scheduling capabilities; they’re ultimately low-level methods for accessing data. If you’re looking to automate your data copy, you can wrap one of these low-level techniques inside of a scheduling engine such as cron or Quartz. However, if you’re looking for out-of-the-box automation, then this section is for you.

Earlier in this chapter we looked at two mechanisms that can move semistructured and binary data into HDFS: the open source HDFS File Slurper project, and Oozie, which triggers a data ingress workflow. The challenge in using a local filesystem for egress (and ingress for that matter) is that map and reduce tasks running on clusters won’t have access to the filesystem on a specific server. You have three broad options for moving data from HDFS to a filesystem:

·        You can host a proxy tier on a server, such as a web server, which you would then write to using MapReduce.

·        You can write to the local filesystem in MapReduce and then, as a postprocessing step, trigger a script on the remote server to move that data.

·        You can run a process on the remote server to pull data from HDFS directly.

The third option is the preferred approach because it’s the simplest and most efficient, and as such it’s the focus of this section. We’ll look at how you can use the HDFS File Slurper to automatically move files from HDFS out to a local filesystem.

Technique 52 An automated mechanism to export files from HDFS

Let’s say you have files being written in HDFS by MapReduce, and you want to automate their extraction to a local filesystem. This kind of feature isn’t supported by any Hadoop tools, so you have to look elsewhere.


You want to automate moving files from HDFS to a local filesystem.


The HDFS File Slurper can be used to copy files from HDFS to a local filesystem.


The goal here is to use the HDFS File Slurper project (https://github.com/alexholmes/hdfs-file-slurper) to assist with the automation. We covered the HDFS File Slurper in detail in technique 40—please read that section before continuing with this technique.

In addition to the way you used it in technique 40, the HDFS Slurper also supports moving data from HDFS out to a local directory. All you need to do is flip around the source and destination directories, as you can see from the following subsection of the Slurper’s configuration file:

SRC_DIR = hdfs:/tmp/slurper/in

WORK_DIR = hdfs:/tmp/slurper/work

COMPLETE_DIR = hdfs:/tmp/slurper/complete

ERROR_DIR = hdfs:/tmp/slurper/error

DEST_STAGING_DIR = file:/tmp/slurper/stage

DEST_DIR = file:/tmp/slurper/dest

You’ll notice that not only is the source directory in HDFS, but also the work, complete, and error directories are there. This is because you need to be able to atomically move files between directories without incurring the expensive overhead of copying the files across filesystems.


At this point you may wonder how you can trigger the Slurper to copy a directory that was just written with a MapReduce job. When a MapReduce job completes successfully, it creates a file called _SUCCESS in the job output directory. This would seem like the perfect trigger to kick off an egress process to copy that content to a local file-system. As it turns out, Oozie has a mechanism that can trigger a workflow when it detects these Hadoop “success” files, but again the challenge here is that any work performed by Oozie is performed in MapReduce, so it can’t be used to perform the transfer directly.

You could write your own script that polls HDFS for completed directories and then triggers a file copy process. That file copy process could be the Slurper or a simple hadoop fs -get command if the source files need to be kept intact.

In the next topic we’ll look at writing data from Hadoop out to relational databases.

5.3.3. Databases

Databases are usually the target of Hadoop data egress in one of two circumstances: either when you move data back into production databases to be used by production systems, or when you move data into OLAP databases to perform business intelligence and analytics functions.

In this section we’ll use Apache Sqoop to export data from Hadoop to a MySQL database. Sqoop is a tool that simplifies database imports and exports. Sqoop is covered in detail in technique 42.

We’ll walk through the process of exporting data from HDFS to Sqoop. We’ll also cover methods for using the regular connector, as well as how to perform bulk imports using the fast connector.

Technique 53 Using Sqoop to export data to MySQL

Hadoop excels at performing operations at scales that defeat most relational databases, so it’s common to extract OLTP data into HDFS, perform some analysis, and then export it back out to a database.


You want to write data to relational databases, and at the same time ensure that writes are idempotent.


This technique covers how Sqoop can be used to export text files to a relational database and also looks at how Sqoop can be configured to work with files with custom field and record delimiters. We’ll also cover idempotent exports to make sure that failed exports don’t leave your database in an inconsistent state.


This technique assumes you’ve already followed the instructions in technique 42 to install MySQL and create the schema.

Sqoop exports require that the database table you’re exporting into already exists. Sqoop can support both inserts and updates of rows in the table.

Exporting data to a database shares many of the arguments that we examined in the import section. The differences are that exports require the --export-dir argument to determine the HDFS directory to export. You’ll also create another options file for exports to keep from insecurely supplying the password on the command line:

$ cat > ~/.sqoop_export_options.txt << EOF









$ chmod 700 ~/.sqoop_export_options.txt

Your first step will be to export data from MySQL to HDFS to ensure you have a good starting point, as shown in the following commands:

$ hadoop fs -rmr stocks

$ sqoop --options-file ~/.sqoop_import_options.txt \

  --connect jdbc:mysql://localhost/sqoop_test --table stocks

The result of the Sqoop import is a number of CSV files in HDFS, as you can see in the following code:

$ hadoop fs -cat stocks/part-m-00000 | head




For the Sqoop export from HDFS to MySQL, you’ll specify that the target table should be stocks_export and that it should export data from the HDFS stocks directory:

$ sqoop --options-file ~/.sqoop_export_options.txt \

    --export-dir stocks \

    --table stocks_export

By default, Sqoop exports will perform an INSERT into the target database table. It can support updates with the --update-mode argument. A value of updateonly means that if there’s no matching key, the updates will fail. A value of allowinsert results in an insert if a matching key doesn’t exist. The table column name that’s used to perform the update is supplied in the --update-key argument.

The following example indicates that only an update should be attempted, using the primary key for the update:

$ sqoop --options-file ~/.sqoop_export_options.txt \

    --update-mode updateonly \

    --update-key id \

    --export-dir stocks \

    --table stocks_export

Input data formatting

You can use several options to override the default Sqoop settings for parsing the input data. Table 5.7 lists these options.

Table 5.7. Formatting options for input data






The field enclosing character. Every field must be enclosed with this character. (If the field enclosing character can occur inside a field, the --input-optionally-enclosed-by option should be used to enclose that field.)



Escape character, where the next character is extracted literally and isn’t parsed.



The field separator.



The line terminator.



The field enclosing character. This argument is the same as --input-enclosed-by, except that it’s applied only to fields that contain the field separator character. For example, in CSV it’s common for fields to be enclosed by double quotes only when they contain commas.

Idempotent exports

The Sqoop map tasks that perform the exports use multiple transactions for their database writes. If a Sqoop export MapReduce job fails, your table could contain partial writes. For idempotent database writes, Sqoop can be instructed to perform the MapReduce writes to the staging table. After successful job completion, the staging table is moved to the target table in a single transaction, which is idempotent. You can see the sequence of events in figure 5.19.

Figure 5.19. Sqoop staging sequence of events, which helps ensure idempotent writes

In the following example, the staging table is stocks_staging, and you’re also telling Sqoop to clear it out before the MapReduce job starts with the --clear-staging-table argument:

$ sqoop --options-file ~/.sqoop_export_options.txt \

        --export-dir stocks \

        --table stocks_export \

        --staging-table stocks_staging \


Direct exports

You used the fast connector in the import technique, which was an optimization that used the mysqldump utility. Sqoop exports also support using the fast connector, which uses the mysqlimport tool. As with mysqldump, all of the nodes in your cluster need to have mysqlimport installed and available in the path of the user that’s used to run MapReduce tasks. And as with the import, the --direct argument enables utilization of the fast connectors:

$ sqoop --options-file ~/.sqoop_export_options.txt \

        --direct \

        --export-dir stocks \

        --table stocks_export

Idempotent exports with mysqlimport

Sqoop doesn’t support using fast connectors in conjunction with a staging table, which is how you achieve idempotent writes with regular connectors. But it’s still possible to achieve idempotent writes with fast connectors with a little extra work at your end. You need to use the fast connector to write to a staging table, and then trigger the INSERT statement, which atomically copies the data into the target table. The steps would look like the following:

$ sqoop --options-file ~/.sqoop_export_options.txt \

        --direct \

        --export-dir stocks \

        --table stocks_staging

$ mysql --host=localhost \

        --user=hip_sqoop_user \

        --password=password \

        -e "INSERT INTO stocks_export (SELECT * FROM stocks_staging)"\


This breaks the earlier rule about exposing credentials on the command line, but it’s easy to write a wrapper script that can read these settings from a configuration file.


Sqoop provides a simplified usage model compared to using the DBInputFormat format classes that are provided in MapReduce. But using the DBInputFormat classes will give you the added flexibility to transform or preprocess your data in the same MapReduce job that performs the database export. The advantage of Sqoop is that it doesn’t require you to write any code, and it has some useful notions, such as staging, to help you achieve your idempotent goals.

The final step in this section, and in the chapter, is to look at exporting data to HBase.

5.3.4. NoSQL

MapReduce is a powerful and efficient way to bulk-load data into external systems. So far we’ve covered how Sqoop can be used to load relational data, and now we’ll look at NoSQL systems, and specifically HBase.

Apache HBase is a distributed key/value, column-oriented data store. Earlier in this chapter we looked at how to import data from HBase into HDFS, as well as how to use HBase as a data source for a MapReduce job.

The most efficient way to load data into HBase is via its built-in bulk-loading mechanism, which is described in detail on the HBase wiki page titled “Bulk Loading” at https://hbase.apache.org/book/arch.bulk.load.html. But this approach bypasses the write-ahead log (WAL), which means that the data being loaded isn’t replicated to slave HBase nodes.

HBase also comes with an org.apache.hadoop.hbase.mapreduce.Export class, which will load HBase tables from HDFS, similar to how the equivalent import worked earlier in this chapter. But you must have your data in SequenceFile form, which has disadvantages, including no support for versioning.

You can also use the TableOutputFormat class in your own MapReduce job to export data to HBase, but this approach is slower than the bulk-loading tool.

We’ve now concluded our examination of Hadoop egress tools. We covered how you can use the HDFS File Slurper to move data out to a filesystem and how to use Sqoop for idempotent writes to relational databases, and we wrapped up with a look at ways to move Hadoop data into HBase.

5.4. Chapter summary

Moving data in and out of Hadoop is a critical part of the Hadoop architecture. In this chapter we covered a broad spectrum of techniques that you can use to perform data ingress and egress activities and that work with a variety of data sources. Of note, we covered Flume, a data collection and distribution solution, Sqoop, a tool for moving relational data in and out of Hadoop, and Camus, a tool for ingesting Kafka data into HDFS.

Now that your data is tucked away in HDFS, it’s time to look at some interesting processing patterns that you can apply to that data.