Hadoop in Practice, Second Edition (2015)
Part 3. Big data patterns
Chapter 7. Utilizing data structures and algorithms at scale
This chapter covers
· Representing and using data structures such as graphs, HyperLogLog, and Bloom filters in MapReduce
· Applying algorithms such as PageRank and semi-joins to large amounts of data
· Learning how social network companies recommend making connections with people outside your network
In this chapter we’ll look at how you can implement algorithms in MapReduce to work with internet-scale data. We’ll focus on nontrivial data, which is commonly represented using graphs.
We’ll also look at how you can use graphs to model connections between entities, such as relationships in a social network. We’ll run through a number of useful algorithms that can be performed over graphs, such as shortest path and friends-of-friends (FoF), to help expand the interconnectedness of a network, and PageRank, which looks at how to determine the popularity of web pages.
You’ll learn how to use Bloom filters, whose unique space-saving properties make them handy for solving distributed system problems in P2P (peer-to-peer) and distributed databases. We’ll also create Bloom filters in MapReduce and then look at their usefulness for filtering.
You’ll also learn about another approximate data structure called HyperLogLog, that provides approximate unique counts, which are invaluable in aggregation pipelines.
A chapter on scalable algorithms wouldn’t be complete without mention of sorting and joining algorithms, which are covered in chapter 6.
Let’s kick things off with a look at how you can model graphs in MapReduce.
7.1. Modeling data and solving problems with graphs
Graphs are mathematical constructs that represent an interconnected set of objects. They’re used to represent data such as the hyperlink structure of the internet, social networks (where they represent relationships between users), and internet routing to determine optimal paths for forwarding packets.
A graph consists of a number of nodes (formally called vertices) and links (informally called edges) that connect nodes together. Figure 7.1 shows a graph with nodes and edges.
Figure 7.1. A small graph with highlighted nodes and edges
The edges can be directed (implying a one-way relationship) or undirected. For example, you would use a directed graph to model relationships between users in a social network, because relationships are not always bidirectional. Figure 7.2 shows examples of directed and undirected graphs.
Figure 7.2. Directed and undirected graphs
Directed graphs, where the edges have a direction, can be cyclic or acyclic. In cyclic graphs, it’s possible for a vertex to reach itself by traversing a sequence of edges. In an acyclic graph, it’s not possible for a vertex to traverse a path to reach itself. Figure 7.3 shows examples of cyclic and acyclic graphs.
Figure 7.3. Cyclic and acyclic graphs
To start working with graphs, you’ll need to be able to represent them in your code. So what are the common methods used to represent these graph structures?
7.1.1. Modeling graphs
Two common ways of representing graphs are with adjacency matrices and adjacency lists.
Adjacency matrix
With an adjacency matrix, you represent a graph as an N x N square matrix M, where N is the number of nodes and Mij represents an edge between nodes i and j.
Figure 7.4 shows a directed graph representing connections in a social graph. The arrows indicate one-way relationships between two people. The adjacency matrix shows how this graph would be represented.
Figure 7.4. An adjacency matrix representation of a graph
The disadvantage of adjacency matrices are that they model both the existence and lack of a relationship, which makes them dense data structures requiring more space than adjacency lists.
Adjacency list
Adjacency lists are similar to adjacency matrices, except that they don’t model the lack of relationships. Figure 7.5 shows how you’d represent a graph using an adjacency list.
Figure 7.5. An adjacency list representation of a graph
The advantage of the adjacency list is that it offers a sparse representation of the data, which is good because it requires less space. It also fits well when representing graphs in Map-Reduce because the key can represent a vertex, and the values are a list of vertices that denote a directed or undirected relationship node.
Next up we’ll cover three graph algorithms, starting off with the shortest-path algorithm.
7.1.2. Shortest-path algorithm
The shortest-path algorithm is a common problem in graph theory, where the goal is to find the shortest route between two nodes. Figure 7.6 shows an example of this algorithm on a graph where the edges don’t have a weight, in which case the shortest path is the path with the smallest number of hops or intermediary nodes between the source and destination.
Figure 7.6. Example of shortest path between nodes A and E
Applications of this algorithm include determining the shortest route between two addresses in traffic mapping software, routers computing the shortest path tree for each route, and social networks determining connections between users.
Technique 67 Find the shortest distance between two users
Dijkstra’s algorithm is a shortest-path algorithm commonly taught in undergraduate computer science courses. A basic implementation uses a sequential iterative process to traverse the entire graph from the starting node, as seen in the algorithm presented in figure 7.7.
Figure 7.7. Pseudocode for Dijkstra’s algorithm
The basic algorithm doesn’t scale to graphs that exceed your memory sizes, and it’s also sequential and not optimized for parallel processing.
Problem
You need to use MapReduce to find the shortest path between two people in a social graph.
Solution
Use an adjacency matrix to model a graph, and for each node, store the distance from the original node, as well as a backpointer to the original node. Use the mappers to propagate the distance to the original node, and the reducers to restore the state of the graph. Iterate until the target node has been reached.
Discussion
Figure 7.8 shows a small social network that you’ll use for this technique. Your goal is to find the shortest path between Dee and Joe. There are four paths that you can take from Dee to Joe, but only one of them results in the fewest hops.
Figure 7.8. Social network used in this technique
You’ll implement a parallel breadth-first search algorithm to find the shortest path between two users. Because you’re operating on a social network, you don’t need to care about weights on your edges. The pseudocode for the algorithm can be seen in figure 7.9.
Figure 7.9. Pseudocode for breadth-first parallel search on graph using MapReduce
Figure 7.10 shows the algorithm iterations in play with your social graph. Just like Dijkstra’s algorithm, you’ll start with all the node distances set to infinite and set the distance for the starting node, Dee, at zero. With each MapReduce pass, you’ll determine nodes that don’t have an infinite distance and propagate their distance values to their adjacent nodes. You’ll continue this until you reach the end node.
Figure 7.10. Shortest path iterations through the network
You first need to create the starting point. This is done by reading in the social network (which is stored as an adjacency list) from the file and setting the initial distance values. Figure 7.11 shows the two file formats, the second being the format that’s used iteratively in your MapReduce code.
Figure 7.11. Original social network file format and MapReduce form optimized for algorithm
Your first step is to create the MapReduce form from the original file. The following listing shows the original input file and the MapReduce-ready form of the input file generated by the transformation code:
The code that generates the previous output is shown here:^{[}^{1}^{]}
^{1} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/shortestpath/Main.java.
The structure of the MapReduce data isn’t changed across iterations of the algorithm; each job produces the same structure, which makes it easy to iterate, because the input format is the same as the output format.
Your map function will perform two major tasks. First, it outputs all the node data to preserve the original structure of the graph. If you didn’t do this, you couldn’t make this an interactive process, because the reducer wouldn’t be able to reproduce the original graph structure for the next map phase. The second task of the map is to output that adjacent node with its distance and a backpointer if the node has a non-infinite distance number. The backpointer carries information about the nodes visited from the starting node, so when you reach the end node, you know the exact path that was taken to get there. Here’s the code for the map function:^{[}^{2}^{]}
^{2} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/shortestpath/Map.java.
When outputting the original input node, as well as the adjacent nodes and the distances to them, the format (not contents) of the map output value is identical to make it easier for your reducer to read the data. To do this, you use a Node class to model the notion of a node, its adjacent nodes, and the distance from the starting node. Its toString method generates a String form of this data, which is used as the map output key, as shown in the following listing.^{[}^{3}^{]}
^{3} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/shortestpath/Node.java.
Listing 7.1. The Node class helps with serialization in MapReduce code
public class Node {
private int distance = INFINITE;
private String backpointer;
private String[] adjacentNodeNames;
public static int INFINITE = Integer.MAX_VALUE;
public static final char fieldSeparator = '\t';
...
public String constructBackpointer(String name) {
StringBuilder backpointer = new StringBuilder();
if (StringUtils.trimToNull(getBackpointer()) != null) {
backpointers.append(getBackpointer()).append(":");
}
backpointer.append(name);
return backpointer.toString();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(distance)
.append(fieldSeparator)
.append(backpointer);
if (getAdjacentNodeNames() != null) {
sb.append(fieldSeparator)
.append(StringUtils
.join(getAdjacentNodeNames(), fieldSeparator));
}
return sb.toString();
}
public static Node fromMR(String value) throws IOException {
String[] parts = StringUtils.splitPreserveAllTokens(
value, fieldSeparator);
if (parts.length < 2) {
throw new IOException(
"Expected 2 or more parts but received " + parts.length);
}
Node node = new Node()
.setDistance(Integer.valueOf(parts[0]))
.setBackpointer(StringUtils.trimToNull(parts[1]));
if (parts.length > 2) {
node.setAdjacentNodeNames(Arrays.copyOfRange(parts, 2,
parts.length));
}
return node;
}
The reducer is invoked for each node and is supplied a list of all the adjacent nodes and their shortest paths. It iterates through all the adjacent nodes and determines the current node’s shortest path by selecting the adjacent node with the smallest, shortest path. The reducer then outputs the minimum distance, the backpointer, and the original adjacent nodes. The following listing shows this code.^{[}^{4}^{]}
^{4} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/shortestpath/Reduce.java.
Listing 7.2. The reducer code for the shortest-path algorithm
You’re ready to run your code. You need to copy the input file into HDFS, and then kick off your MapReduce job, specifying the start node name (dee) and target node name (joe):
$ hadoop fs -put \
test-data/ch7/friends-short-path.txt \
friends-short-path.txt
$ hip hip.ch7.shortestpath.Main \
--start dee \
--end joe \
--input friends-short-path.txt \
--output output
==========================================
= Shortest path found, details as follows.
=
= Start node: dee
= End node: joe
= Hops: 2
= Path: dee:ali
==========================================
$ hadoop fs -cat output/2/part*
ali 1 dee dee bob joe
bob 2 dee:kia kia ali joe
dee 0 kia ali
joe 2 dee:ali bob ali
kia 1 dee bob dee
The output of your job shows that the minimum number of hops between Dee and Joe is 2, and that Ali was the connecting node.
Summary
This exercise showed how a shortest-path algorithm could be used to determine the minimum number of hops between two people in a social network. An algorithm related to the shortest-path algorithm, called graph diameter estimation, attempts to determine the average number of hops between nodes.^{[}^{5}^{]} This has been used to support the notion of six degrees of separation in large social network graphs with millions of nodes.^{[}^{6}^{]}
^{5} See U. Kang et al., “HADI: Fast Diameter Estimation and Mining in Massive Graphs with Hadoop” (December 2008), http://reports-archive.adm.cs.cmu.edu/anon/ml2008/CMU-ML-08-117.pdf.
^{6} See Lars Backstrom et al., “Four Degrees of Separation,” http://arxiv.org/abs/1111.4570.
Inefficiencies of using MapReduce for iterative graph processing
Using Map-Reduce for graph processing is inefficient from an I/O perspective—each graph iteration is executed within a single MapReduce job. As a result, the entire graph structure must be written to HDFS (in triplicate, or whatever your HDFS replication setting is) in between jobs and then be read by the subsequent job. Graph algorithms that may require a large number of iterations (such as this shortest-path example) are best executed using Giraph, which is covered in section 7.1.4.
The shortest-path algorithm has multiple applications, but an arguably more useful and utilized algorithm in social networks is friends-of-friends (FoF).
7.1.3. Friends-of-friends algorithm
Social network sites such as LinkedIn and Facebook use the friends-of-friends (FoF) algorithm to help users broaden their networks.
Technique 68 Calculating FoFs
The friends-of-friends algorithm suggests friends that a user may know but who aren’t part of their immediate network. For this technique, we’ll consider a FoF to be in the second degree of separation, as shown in figure 7.12.
Figure 7.12. An example of FoF where Joe and Jon are considered FoFs to Jim
The key ingredient to success with this approach is to order the FoFs by the number of common friends, which increases the chances that the user knows the FoF.
Problem
You want to implement the FoF algorithm in MapReduce.
Solution
Two MapReduce jobs are required to calculate the FoFs for each user in a social network. The first job calculates the common friends for each user, and the second job sorts the common friends by the number of connections to your friends. You can then recommend new friends by selecting the top FoFs based on this sorted list.
Discussion
You should first look at an example graph and understand what results you’re looking for. Figure 7.13 shows a network of people with Jim, one of the users, highlighted. In this graph, Jim’s FoFs are in bold circles, and the number of friends that the FoF and Jim have in common is also identified.
Figure 7.13. A graph representing Jim’s FoFs
Your goal is to determine all the FoFs and order them by the number of friends in common. In this case, your expected results would have Joe as the first FoF recommendation, followed by Dee, and then Jon.
The text file that represents the social graph for this technique is shown here:
$ cat test-data/ch7/friends.txt
joe jon kia bob ali
kia joe jim dee
dee kia ali
ali dee jim bob joe jon
jon joe ali
bob joe ali jim
jim kia bob ali
This algorithm requires you to write two MapReduce jobs. The first job, the pseudo-code for which is shown in figure 7.14, calculates the FoFs and, for each FoF, counts the number of friends in common. The result of the job is a line for each FoF relationship, excluding people who are already friends.
Figure 7.14. The first MapReduce job, which calculates the FoFs
The output when you execute this job against the graph in figure 7.13 is shown here:
ali kia 3
bob dee 1
bob jon 2
bob kia 2
dee jim 2
dee joe 2
dee jon 1
jim joe 3
jim jon 1
jon kia 1
The second job needs to produce output that lists FoFs in order of the number of common friends. Figure 7.15 shows the algorithm. You’re using a secondary sort to order a user’s FoFs in order of the number of common friends.
Figure 7.15. The second MapReduce job, which sorts the FoFs by the number of friends in common
The output of executing this job against the output of the previous job can be seen here:
ali kia:3
bob kia:2,jon:2,dee:1
dee jim:2,joe:2,jon:1,bob:1
jim joe:3,dee:2,jon:1
joe jim:3,dee:2
jon bob:2,kia:1,dee:1,jim:1
kia ali:3,bob:2,jon:1
Let’s dive into the code. The following listing shows the first MapReduce job, which calculates the FoFs for each user.^{[}^{7}^{]}
^{7} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/friendsofafriend/CalcMapReduce.java.
Listing 7.3. Mapper and reducer implementations for FoF calculation
The job of the second MapReduce job in the following listing is to sort the FoFs so that you see FoFs with a higher number of mutual friends ahead of those that have a smaller number of mutual friends.^{[}^{8}^{]}
^{8} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/friendsofafriend/SortMapReduce.java.
Listing 7.4. Mapper and reducer implementations that sort FoFs
I won’t show the whole driver code, but to enable the secondary sort, I had to write a few extra classes as well as inform the job to use the classes for partitioning and sorting purposes: ^{[}^{9}^{]}
^{9} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/friendsofafriend/Main.java.
job.setPartitionerClass(PersonNamePartitioner.class);
job.setSortComparatorClass(PersonComparator.class);
job.setGroupingComparatorClass(PersonNameComparator.class);
For more details on how secondary sort works, look at chapter 6.
Copy the input file containing the friend relationships into HDFS, and then run the driver code to run your two MapReduce jobs. The last two arguments are the output directories for the two MapReduce jobs:
$ hadoop fs -put test-data/ch7/friends.txt .
$ hip hip.ch7.friendsofafriend.Main \
--input friends.txt \
--calc-output outputcalc \
--sort-output outputsort
After running your code, you can look at the output in HDFS:
$ hadoop fs -cat outputsort/part*
ali kia:3
bob kia:2,jon:2,dee:1
dee jim:2,joe:2,jon:1,bob:1
jim joe:3,dee:2,jon:1
joe jim:3,dee:2
jon bob:2,kia:1,dee:1,jim:1
kia ali:3,bob:2,jon:1
This output verifies what you saw with your own eyes in figure 7.13. Jim has three FoFs, and they’re ordered by the number of common friends.
Summary
This approach can be used not only as a recommendation engine to help users grow their networks, but also for informational purposes when the user is browsing the social network’s website. For example, when you view people in LinkedIn, you’ll be shown the degrees of separation between you and the person being viewed. This approach can be used to precompute that information for two hops. To reproduce this for three hops (for example, to show friends-of-friends-of-friends) you’d need to introduce a third MapReduce job to compute the third hop from the output of the first job.
To simplify this approach, we used an undirected graph, which implies that user relationships are bidirectional. Most social networks don’t have such a notion, and the algorithm would need some minor tweaks to model directed graphs.
This example required two MapReduce jobs to complete the algorithm, which means that the entire graph was written to HDFS between jobs. This isn’t particularly inefficient given the number of jobs, but once the number of iterations over your graph data goes beyond two, it’s probably time to start looking at more efficient ways of working with your graph data. You’ll see this in the next technique, where Giraph will be used to calculate the popularity of web pages.
7.1.4. Using Giraph to calculate PageRank over a web graph
Using MapReduce for iterative graph processing introduces a number of inefficiencies, which are highlighted in figure 7.16.
Figure 7.16. An iterative graph algorithm implemented using MapReduce
This isn’t something that should be of concern if your graph algorithm only requires one or two iterations, but beyond that the successive HDFS barriers between jobs will start to add up, especially with large graphs. At that point it’s time to look at alternative methods for graph processing, such as Giraph.
This section presents an overview of Giraph and then applies it to calculate PageRank over a web graph. PageRank is a good fit for Giraph as it’s an example of an iterative graph algorithm that can require many iterations before the graph converges.
An Introduction to Giraph
Giraph is an Apache project modeled after Google’s Pregel, which describes a system for large-scale graph processing. Pregel was designed to reduce the inefficiencies of using MapReduce for graph processing and to provide a programming model that is vertex-centric.
To combat the disk and network barriers that exist in MapReduce, Giraph loads all the vertices into memory across a number of worker processes and keeps them in memory during the whole process. Each graph iteration is composed of the workers supplying inputs to the vertices that they manage, the vertices performing their processing, and the vertices then emitting messages that the framework routes to the appropriate adjacent vertices in the graph (as seen in figure 7.17).
Figure 7.17. Giraph message passing
Giraph uses bulk synchronous communication (BSP) to support workers’ communication. BSP is essentially an iterative message-passing algorithm that uses a global synchronization barrier between successive iterations. Figure 7.18 shows Giraph workers each containing a number of vertices, and worker intercommunication and synchronization via the barrier.
Figure 7.18. Giraph workers, message passing, and synchronization
Technique 69 will go further into the details, but before we dive in, let’s take a quick look at how PageRank works.
A Brief Overview of PageRank
PageRank is a formula introduced by the founders of Google during their Stanford years in 1998.^{[}^{10}^{]} Their paper discusses an overall approach to crawling and indexing the web, and it includes, as part of that, a calculation that they titled PageRank, which gives a score to each web page indicating the page’s importance. This wasn’t the first paper to introduce a scoring mechanism for web pages,^{[}^{11}^{]} but it was the first to weigh scores propagated to each outbound link based on the total number of outbound links.
^{10} See Sergey Brin and Lawrence Page, “The Anatomy of a Large-Scale Hypertextual Web Search Engine,” http://infolab.stanford.edu/pub/papers/google.pdf.
^{11} Before PageRank, the HITS link-analysis method was popular; see the “Hubs and Authorities” page of Christopher D. Manning, Prabhakar Raghavan, and Hinrich Schütze, Introduction to Information Retrieval, http://nlp.stanford.edu/IR-book/html/htmledition/hubs-and-authorities-1.html.
Fundamentally, PageRank gives pages that have a large number of inbound links a higher score than pages that have a smaller number of inbound links. When evaluating the score for a page, PageRank uses the scores for all the inbound links to calculate a page’s PageRank. But it penalizes individual inbound links that have a high number of outbound links by dividing that outbound link PageRank by the number of outbound links. Figure 7.19 presents a simple example of a web graph with three pages and their respective PageRank values.
Figure 7.19. PageRank values for a simple web graph
Figure 7.20 shows the PageRank formula. In the formula, |webGraph| is a count of all the pages in the graph, and d, set to 0.85, is a constant damping factor used in two parts. First, it denotes the probability of a random surfer reaching the page after clicking on many links (this is a constant equal to 0.15 divided by the total number of pages), and second, it dampens the effect of the inbound link PageRanks by 85%.
Figure 7.20. The PageRank formula
Technique 69 Calculate PageRank over a web graph
PageRank is a graph algorithm that typically requires multiple iterations, and as such doesn’t lend itself to being implemented in MapReduce due to the disk barrier overhead discussed in this section’s introduction. This technique looks at how you can use Giraph, which is well-suited to algorithms that require multiple iterations over large graphs, to implement PageRank.
Problem
You want to implement an iterative PageRank graph algorithm using Giraph.
Solution
PageRank can be implemented by iterating a MapReduce job until the graph has converged. The mappers are responsible for propagating node PageRank values to their adjacent nodes, and the reducers are responsible for calculating new PageRank values for each node, and for re-creating the original graph with the updated PageRank values.
Discussion
One of the advantages of PageRank is that it can be computed iteratively and applied locally. Every vertex starts with a seed value, which is 1 divided by the number of nodes, and with each iteration, each node propagates its value to all pages it links to. Each vertex in turn sums up all the inbound vertex values to compute a new seed value. This iterative process is repeated until such a time as convergence is reached.
Convergence is a measure of how much the seed values have changed since the last iteration. If the convergence value is below a certain threshold, it means that there’s been minimal change and you can stop the iteration. It’s also common to limit the number of iterations for large graphs where convergence takes too many iterations.
Figure 7.21 shows two iterations of PageRank against the simple graph you saw earlier in this chapter.
Figure 7.21. An example of PageRank iterations
Figure 7.22 shows the PageRank algorithm expressed as map and reduce phases. The map phase is responsible for preserving the graph as well as emitting the PageRank value to all the outbound nodes. The reducer is responsible for recalculating the new PageRank value for each node and including it in the output of the original graph.
Figure 7.22. PageRank decomposed into map and reduce phases
In this technique, you’ll operate on the graph shown in figure 7.23. In this graph, all the nodes have both inbound and outbound edges.
Figure 7.23. Sample web graph for this technique
Giraph supports various input and output data formats. For this technique we’ll use JsonLongDoubleFloatDouble-VertexInputFormat as the input format; it requires vertices to be expressed numerically along with an associated weight that we won’t use for this technique. We’ll map vertex A to integer 0, B to 1, and so on, and for each vertex we’ll identify the adjacent vertices. Each line in the data file represents a vertex and the directed edges to adjacent vertices:
[<vertex id>,<vertex value>,[[<dest vertex id>,<vertex weight>][...]]]
The following input file represents the graph in figure 7.23:
[0,0,[[1,0],[3,0]]]
[1,0,[[2,0]]]
[2,0,[[0,0],[1,0]]]
[3,0,[[1,0],[2,0]]]
Copy this data into a file called webgraph.txt and upload it to HDFS:
$ hadoop fs -put webgraph.txt .
Your next step is to write the Giraph vertex class. The nice thing about Giraph’s model is that it’s simple—it provides a vertex-based API where you need to implement the graph-processing logic for a single iteration on that vertex. The vertex class is responsible for processing incoming messages from adjacent vertices, using them to calculate the node’s new PageRank value, and propagating the updated PageRank value (divided by the number of outbound edges) to the adjacent vertices, as shown in the following listing.^{[}^{12}^{]}
^{12} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/pagerank/giraph/PageRankVertex.java.
Listing 7.5. The PageRank vertex
Installing Giraph
Giraph is a Java library and is bundled with the code distribution for this book. Therefore, it doesn’t need to be installed for the examples in this technique to work. The Giraph website at http://giraph.apache.org/ contains download installation for releases if you wish to play with Giraph further.
If you push the web graph into HDFS and run your job, it will run for five iterations until the graph converges:
After the process has completed, you can look at the output in HDFS to see the Page-Rank values for each vertex:
$ hadoop fs -cat output/art*
0 0.15472094578266
2 0.28902904137380575
1 0.25893832306149106
3 0.10043738978626424
According to the output, node C (vertex 2) has the highest PageRank, followed by node B (vertex 1). Initially, this observation may be surprising, given that B has three inbound links and C has just two. But if you look at who’s linking to C, you can see that node B, which also has a high PageRank value, only has one outbound link to C, so node C gets B’s entire PageRank score in addition to its other inbound PageRank score from node D. Therefore, node C’s PageRank will always be higher than B’s.
Summary
When you compare the code you had to write for the MapReduce compared to the code for Giraph, it’s clear that Giraph provides a simple and abstracted model that richly expresses graph concepts. Giraph’s efficiency over that of MapReduce results in Giraph being a compelling solution for your graph-processing needs.
Giraph’s ability to scale to large graphs is highlighted by a Facebook article discussing how Facebook used Giraph to process a graph with a trillion edges.^{[}^{13}^{]} There are other graph technologies that you can evaluate for your needs:
^{13} Avery Ching, “Scaling Apache Giraph to a trillion edges,” https://www.facebook.com/notes/facebook-engineering/scaling-apache-giraph-to-a-trillion-edges/10151617006153920.
· Faunus is a Hadoop-based open source project that supports HDFS and other data sources (http://thinkaurelius.github.io/faunus/).
· GraphX is an in-memory Spark-based project. GraphX is currently not supported by any of the commercial Hadoop vendors, although it will soon be included in Cloudera CDH 5.1 (https://amplab.cs.berkeley.edu/publication/graphx-grades/).
· GraphLab is a C++-based, distributed, graph-processing framework out of Carnegie Mellon University (http://graphlab.com/).
Although you implemented the PageRank formula, it was made simple by the fact that your graph was well connected and that every node had outbound links. Pages with no outbound links are called dangling pages, and they pose a problem for the PageRank algorithm because they becomePageRank sinks—their PageRank values can’t be further propagated through the graph. This, in turn, causes convergence problems because graphs that aren’t strongly connected aren’t guaranteed to converge.
There are various approaches to solving this problem. You could remove the dangling nodes before your PageRank iterations and then add them back for a final Page-Rank iteration after the graph has converged. Or you could sum together the PageRank totals for all dangling pages and redistribute them across all the nodes in the graph. For a detailed examination of dealing with dangling pages as well as advanced PageRank practices, see Google’s PageRank and Beyond by Amy N. Langville and Carl Dean Meyer (Princeton University Press, 2012).
This concludes the section on graphs. As you learned, graphs are useful mechanisms for representing people in a social network and pages in a web. You used these models to discover some useful information about your data, such as finding the shortest path between two points and what web pages are more popular than others.
This brings us to the subject of the next section, Bloom filters. Bloom filters are a different kind of data structure from graphs. Whereas graphs are used to represent entities and their relationships, Bloom filters are a mechanism for modeling sets and performing membership queries on their data, as you’ll discover next.
7.2. Bloom filters
A Bloom filter is a data structure that offers a membership query mechanism where the answer to a lookup is one of two values: a definitive no, meaning that the item being looked up doesn’t exist in the Bloom filter, or a maybe, meaning that there’s a probability that the item exists. Bloom filters are popular due to their space efficiencies—representing the existence of N elements requires much less space than N positions in the data structure, which is why the membership query can yield false positive results. The amount of false positives in a Bloom filter can be tuned, which we’ll discuss shortly.
Bloom filters are used in BigTable and HBase to remove the need to read blocks from disk to determine if they contain a key. They’re also used in distributed network applications such as Squid to share cache details between multiple instances without having to replicate the whole cache or incur a network I/O hit in the case of cache misses.
The implementation of Bloom filters is simple. They use a bit array of size m bits, where initially each bit is set to 0. They also contain k hash functions, which are used to map elements to k locations in the bit array.
To add an element to a Bloom filter, it’s hashed k times, and a modulo of the hashed value and the size of the bit array is used to map the hashed value to a specific bit array location. That bit in the bit array is then toggled to 1. Figure 7.24 shows three elements being added to a Bloom filter and their locations in the bit array.
Figure 7.24. Adding elements to a Bloom filter
To check the membership of an element in the Bloom filter, just like with the add operation, the element is hashed k times, and each hash key is used to index into the bit array. A true response to the membership query is only returned in cases where all k bit array locations are set to 1. Otherwise, the response to the query is false.
Figure 7.25 shows an example of a membership query where the item was previously added to the Bloom filter, and therefore all the bit array locations contained a 1. This is an example of a true positive membership result.
Figure 7.25. An example of a Bloom filter membership query that yields a true positive result
Figure 7.26 shows how you can get a false positive result for a membership query. The element being queried is d, which hadn’t been added to the Bloom filter. As it happens, all k hashes for d are mapped to locations that are set to 1 by other elements. This is an example of collision in the Bloom filter, where the result is a false positive.
Figure 7.26. An example of a Bloom filter membership query that yields a false positive result
The probability of false positives can be tuned based on two factors: m, the number of bits in the bit array, and k, the number of hash functions. Or expressed another way, if you have a desired false positive rate in mind and you know how many elements will be added to the Bloom filter, you can calculate the number of bits needed in the bit array with the equation in figure 7.27.
Figure 7.27. Equation to calculate the desired number of bits for a Bloom filter
The equation shown in figure 7.28 assumes an optimal number of k hashes and that the hashes being produced are random over the range {1..m}.
Figure 7.28. Equation to calculate the optimal number of hashes
Put another way, if you want to add 1 million elements into a Bloom filter with a 1% false positive rate for your membership queries, you’ll need 9,585,058 bits or 1.2 megabytes with seven hash functions. This is around 9.6 bits for each element.
Table 7.1 shows the calculated number of bits per element for various false positive rates.
Table 7.1. Number of bits required per element for different false positive rates
False positives |
Bits required per element |
2% |
8.14 |
1% |
9.58 |
0.1% |
14.38 |
With all that theory in your head, you now need to turn your attention to the subject of how Bloom filters can be utilized in MapReduce.
Technique 70 Parallelized Bloom filter creation in MapReduce
MapReduce is good for processing large amounts of data in parallel, so it’s a good fit if you want to create a Bloom filter based on a large set of input data. For example, let’s say you’re a large, internet, social-media organization with hundreds of millions of users, and you want to create a Bloom filter for a subset of users that are within a certain age demographic. How would you do this in MapReduce?
Problem
You want to create a Bloom filter in MapReduce.
Solution
Write a MapReduce job to create and output a Bloom filter using Hadoop’s built-in BloomFilter class. The mappers are responsible for creating intermediary Bloom filters, and the single reducer combines them together to output a combined Bloom filter.
Discussion
Figure 7.29 shows what this technique will do. You’ll write a mapper, which will process user data and create a Bloom filter containing users in a certain age bracket. The mappers will emit their Bloom filters, and a single reducer will combine them together. The final result is a single Bloom filter stored in HDFS in Avro form.
Figure 7.29. A MapReduce job to create a Bloom filter
Hadoop comes bundled with an implementation of a Bloom filter in the form of the org.apache.hadoop.util.bloom.BloomFilter class, illustrated in figure 7.30. Luckily, it’s a Writable, which makes it easy to ship around in MapReduce. The Key class is used to represent an element, and it is also a Writable container for a byte array.
Figure 7.30. The BloomFilter class in MapReduce
The constructor requires that you tell it what hashing function to use. There are two implementations you can choose from: Jenkins and Murmur. They’re both faster than cryptographic hashers such as SHA-1 and they produce good distributions. Benchmarks indicate that Murmur has faster hashing times than Jenkins, so that’s what we’ll use here.
Let’s press on with the code. Your map function will operate on your user information, which is a simple key/value pair, where the key is the user name, and the value is the user’s age:^{[}^{14}^{]}
^{14} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/bloom/BloomFilterCreator.java.
Why do you output the Bloom filter in the close method, and not output it for every record you process in the map method? You do this to cut down on the amount of traffic between the map and reduce phases; there’s no reason to output a lot of data if you can pseudo-combine them yourself on the map side and emit a single BloomFilter per map.
Your reducer’s job is to combine all the Bloom filters outputted by the mappers into a single Bloom filter. The unions are performed with the bitwise OR method exposed by the BloomFilter class. When performing a union, all the BloomFilter attributes, such as bit array size and number of hashes, must be identical:^{[}^{15}^{]}
^{15} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/bloom/BloomFilterCreator.java.
To try this out, upload your sample user file and kick off your job. When the job is complete, dump the contents of the Avro file to view the contents of your BloomFilter:
$ hadoop fs -put test-data/ch7/user-ages.txt .
$ hadoop fs -cat user-ages.txt
anne 23
joe 45
alison 32
mike 18
marie 54
$ hip hip.ch7.bloom.BloomFilterCreator \
--input user-ages.txt \
--output output
$ hip hip.ch7.bloom.BloomFilterDumper output/part-00000.avro
{96, 285, 292, 305, 315, 323, 399, 446, 666, 667, 670,
703, 734, 749, 810}
The BloomFilterDumper code unmarshals the BloomFilter from the Avro file and calls the toString() method, which in turn calls the BitSet.toString() method, which outputs the offset for each bit that is “on.”
Summary
You used Avro as a serialization format for the Bloom filter. You could have just as easily emitted the BloomFilter object in your reducer, because it’s a Writable.
You used a single reducer in this technique, which will scale well to jobs that use thousands of map tasks and BloomFilters whose bit array sizes are in the millions. If the time taken to execute the single reducer becomes too long, you can run with multiple reducers to parallelize the Bloom filter unions, and have a postprocessing step to combine them further into a single Bloom filter.
Another distributed method for creating a Bloom filter would be to view the set of reducers as the overall bit array, and perform the hashing and output the hashes in the map phase. The partitioner would then partition the output to the relevant reducer that manages that section of the bit array.Figure 7.31 illustrates this approach.
Figure 7.31. An alternate architecture for creating Bloom filters
For code comprehensibility, you hardcoded the BloomFilter parameters in this technique; in reality, you’ll want to either calculate them dynamically or move them into a configuration file.
This technique resulted in the creation of a BloomFilter. This BloomFilter could be pulled out of HDFS and used in another system, or it could be used directly in Hadoop, as shown in technique 61, where a Bloom filter was used as a way to filter data emitted from reducers in joins.
7.3. HyperLogLog
Imagine that you’re building a web analytics system where one of the data points you’re calculating is the number of unique users that have visited a URL. Your problem domain is web-scale, so you have hundreds of millions of users. A naive Map-Reduce implementation of aggregation would involve using a hashtable to store and calculate the unique users, but this could exhaust your JVM heap when dealing with a large number of users. A more sophisticated solution would use a secondary sort so that user IDs are sorted, and the grouping occurs at the URL level so that you can count unique users without any storage overhead.
These solutions work well when you have the ability to process the entire dataset at once. But if you have a more complex aggregation system where you create aggregations in time buckets and you need to combine buckets together, then you’d need to store the entire set of unique users for each URL in each time bucket, which would explode your data storage needs.
To combat this, you could use a probabilistic algorithm such as HyperLogLog, which has a significantly smaller memory footprint than a hashtable. The trade-off with these probabilistic data structures is accuracy, which you can tune. In some ways, HyperLogLog is similar to a Bloom filter, but the key difference is that HyperLogLog will estimate a count, whereas a Bloom filter only provides membership capabilities.
In this section you’ll learn how HyperLogLog works and see how it can be used in MapReduce to efficiently calculate unique counts.
7.3.1. A brief introduction to HyperLogLog
HyperLogLog was first introduced in a 2007 paper to “estimate the number of distinct elements of very large data ensembles.”^{[}^{16}^{]} Potential applications include link-based spam detection on the web and data mining over large datasets.
^{16} Philippe Flajolet et al., “HyperLogLog: the analysis of a near-optimal cardinality estimation algorithm,” http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf.
HyperLogLog is a probabilistic cardinality estimator—it relaxes the constraint of exactly calculating the number of elements in a set and instead estimates the number of elements. Data structures that support exact set-cardinality calculations require storage that is proportional to the number of elements, which may not be optimal when working with large datasets. Probabilistic cardinality structures occupy less memory than their exact cardinality counterparts, and they’re applicable in situations where the cardinality can be off by a few percentage points.
HyperLogLog can perform cardinality estimation for counts beyond 10^{9} using 1.5 KB of memory with an error rate of 2%. HyperLogLog works by counting the maximum number of consecutive zeros in a hash and using probabilities to predict the cardinality of all the unique items. Figure 7.32 shows how a hashed value is represented in HyperLogLog. For additional details, refer to the HyperLogLog paper.
Figure 7.32. How HyperLogLog works
There are two parameters you’ll need to tune when working with HyperLogLog:
· The number of buckets, usually expressed by a number, b, which is then used to determine the number of buckets by calculating 2^{b}. Therefore, each increment in b doubles the number of buckets. The lower bound on b is 4, and the upper bound varies by implementation.
· The number of bits used to represent the maximum number of consecutive zeros in a bucket.
As a result, the size of the HyperLogLog is calculated by 2^{b} * bits-per-bucket. In typical usage, b is 11 and the number of bits per bucket is 5, which results in 10,240 bits, or 1.25 KB.
Technique 71 Using HyperLogLog to calculate unique counts
In this technique you’ll see a simple example of HyperLogLog in action. The summary will present some details on how HyperLogLog can be incorporated into your MapReduce flows.
Problem
You’re working with a large dataset and you want to calculate distinct counts. You are willing to accept a small percentage of error.
Solution
Use HyperLogLog.
Discussion
For this technique you’ll use a HyperLogLog Java implementation from a GitHub project called java-hll (https://github.com/aggregateknowledge/java-hll). This code provides the basic HyperLogLog functions, in addition to useful functions that allow you to perform a union and intersect multiple logs together.
The following example shows a simple case where your data consists of an array of numbers, and Google’s Guava library is used to create a hash for each number and add it to the HyperLogLog:^{[}^{17}^{]}
^{17} GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch7/hyperloglog/Example.java.
Running this example yields the expected number of distinct items:
$ hip hip.ch7.hyperloglog.Example
Distinct count = 5
This code can be easily adapted into a Hadoop job to perform distinct counts over large datasets. For example, imagine that you’re writing a MapReduce job to calculate the distinct number of users that visit each page on your website. In MapReduce, your mappers would output a URL and user ID as the key and value respectively, and your reducers would need to calculate the unique set of users for each web page. In this situation, you can use a HyperLogLog structure to efficiently calculate an approximate distinct count of users without the overhead that would be incurred by using a hash set.
Summary
The hll HyperLogLog implementation used in this example has a toBytes method that you can use to serialize the HyperLogLog, and it also has a fromBytes method for deserialization. This makes it relatively straightforward to use within a MapReduce flow and for persistence. Avro, for example, has a bytes field that you can use to write the hll byte form into your records. You could also write your own Writable if you’re working with SequenceFiles.
If you’re using Scalding or Summingbird, then Algebird has a HyperLogLog implementation that you can use—take a look at https://github.com/twitter/algebird for more details.
7.4. Chapter summary
Most of the algorithms laid out in this chapter are straightforward. What makes things interesting is how they’re applied in MapReduce in ways that enable you to work efficiently with large datasets.
The two main data structures presented were graphs—good for modeling relationships—and Bloom filters, which excel at compact set membership. In the case of graphs, we looked at how you would use them to model social networks and web graphs, and we went through some algorithms such as FoF and PageRank to mine some interesting facts about your data.
In the case of Bloom filters, we looked at how to use MapReduce to create a Bloom filter in parallel, and then apply that Bloom filter to optimize a semi-join operation in Map-Reduce.
We’ve only scratched the surface in this chapter regarding how data can be modeled and processed. Algorithms related to sorting and joins are covered in other chapters. The next chapter covers techniques to diagnose and tune Hadoop to squeeze as much performance as you can out of your clusters.