Field Guide to Hadoop (2015)

Chapter 2. Database and Data Management

If you’re planning to use Hadoop, it’s likely that you’ll be managing lots of data, and in addition to MapReduce jobs, you may need some kind of database. Since the advent of Google’s BigTable, Hadoop has an interest in the management of data. While there are some relational SQL databases or SQL interfaces to HDFS data, like Hive, much data management in Hadoop uses non-SQL techniques to store and access data. The NoSQL Archive lists more than 150 NoSQL databases that are then classified as:

§  Column stores

§  Document stores

§  Key-value/tuple stores

§  Graph databases

§  Multimodel databases

§  Object databases

§  Grid and cloud databases

§  Multivalue databases

§  Tabular stores

§  Others

NoSQL databases generally do not support relational join operations, complex transactions, or foreign-key constraints common in relational systems but generally scale better to large amounts of data. You’ll have to decide what works best for your datasets and the information you wish to extract from them. It’s quite possible that you’ll be using more than one.

This book will look at many of the leading examples in each section, but the focus will be on the two major categories: key-value stores and document stores (illustrated in Figure 2-1).

fgth 02in01

Figure 2-1. Two approaches to indexing

A key-value store can be thought of like a catalog. All the items in a catalog (the values) are organized around some sort of index (the keys). Just like a catalog, a key-value store is very quick and effective if you know the key you’re looking for, but isn’t a whole lot of help if you don’t.

For example, let’s say I’m looking for Marshall’s review of The Godfather. I can quickly refer to my index, find all the reviews for that film, and scroll down to Marshall’s review: “I prefer the book…”

A document warehouse, on the other hand, is a much more flexible type of database. Rather than forcing you to organize your data around a specific key, it allows you to index and search for your data based on any number of parameters. Let’s expand on the last example and say I’m in the mood to watch a movie based on a book. One naive way to find such a movie would be to search for reviews that contain the word “book.”

In this case, a key-value store wouldn’t be a whole lot of help, as my key is not very clearly defined. What I need is a document warehouse that will let me quickly search all the text of all the reviews and find those that contain the word “book.”

Cassandra

fgth 02in02

License

GPL v2

Activity

High

Purpose

Key-value store

Official Page

https://cassandra.apache.org

Hadoop Integration

API Compatible

Oftentimes you may need to simply organize some of your big data for easy retrieval. One common way to do this is to use a key-value datastore. This type of database looks like the white pages in a phone book. Your data is organized by a unique “key,” and values are associated with that key. For example, if you want to store information about your customers, you may use their username as the key, and information such as transaction history and addresses as values associated with that key.

Key-value datastores are a common fixture in any big data system because they are easy to scale, quick, and straightforward to work with. Cassandra is a distributed key-value database designed with simplicity and scalability in mind. While often compared to HBase (described here), Cassandra differs in a few key ways:

§  Cassandra is an all-inclusive system, which means it does not require a Hadoop environment or any other big data tools.

§  Cassandra is completely masterless: it operates as a peer-to-peer system. This makes it easier to configure and highly resilient.

Tutorial Links

DataStax, a company that provides commercial support for Cassandra, offers a set of freely available videos.

Example Code

The easiest way to interact with Cassandra is through its shell interface. You start the shell by running bin/cqlsh from your install directory.

Then you need to create a keyspace. Keyspaces are similar to schemas in traditional relational databases; they are a convenient way to organize your tables. A typical pattern is to use a single different keyspace for each application:

CREATE KEYSPACE field_guide

WITH REPLICATION = {

    'class': 'SimpleStrategy', 'replication factor' : 3 };

USE field_guide;

Now that you have a keyspace, you’ll create a table within that keyspace to hold your reviews. This table will have three columns and a primary key that consists of both the reviewer and the title, as that pair should be unique within the database:

CREATE TABLE reviews (

    reviewer varchar,

    title varchar,

    rating int,

    PRIMARY KEY (reviewer, title));

Once your table is created, you can insert a few reviews:

INSERT INTO reviews (reviewer,title,rating)

    VALUES ('Kevin','Dune',10);

INSERT INTO reviews (reviewer,title,rating)

    VALUES ('Marshall','Dune',1);

INSERT INTO reviews (reviewer,title,rating)

    VALUES ('Kevin','Casablanca',5);

And now that you have some data, you will create an index that will allow you to execute a simple SQL query to retrieve Dune reviews:

CREATE INDEX ON reviews (title);

SELECT * FROM reviews WHERE title = 'Dune';

 reviewer |      title | rating

----------+------------+-------

    Kevin |       Dune |     10

 Marshall |       Dune |      1

    Kevin | Casablanca |      5

HBase

fgth 02in03

License

Apache License, Version 2.0

Activity

High

Purpose

NoSQL database with random access

Official Page

https://hbase.apache.org

Hadoop Integration

Fully Integrated

There are many situations in which you might have sparse data. That is, there are many attributes of the data, but each observation only has a few of them. For example, you might want a table of various tickets in a help-desk application. Tickets for email might have different information (and attributes or columns) than tickets for network problems or lost passwords, or issues with backup system. There are other situations in which you have data that has a large number of common values in a column or attribute, say “country” or “state.” Each of these example might lead you to consider HBase.

HBase is a NoSQL database system included in the standard Hadoop distributions. It is a key-value store, logically. This means that rows are defined by a key, and have associated with them a number of bins (or columns) where the associated values are stored. The only data type is the byte string. Physically, groups of similar columns are stored together in column families. Most often, HBase is accessed via Java code, but APIs exist for using HBase with Pig, Thrift, Jython (Python based), and others. HBase is not normally accessed in a MapReduce fashion. It does have a shell interface for interactive use.

HBase is often used for applications that may require sparse rows. That is, each row may use only a few of the defined columns. It is fast (as Hadoop goes) when access to elements is done through the primary key, or defining key value. It’s highly scalable and reasonably fast. Unlike traditional HDFS applications, it permits random access to rows, rather than sequential searches.

Though faster than MapReduce, you should not use HBase for any kind of transactional needs, nor any kind of relational analytics. It does not support any secondary indexes, so finding all rows where a given column has a specific value is tedious and must be done at the application level. HBase does not have a JOIN operation; this must be done by the individual application. You must provide security at the application level; other tools like Accumulo (described here) are built with security in mind.

While Cassandra (described here) and MongoDB (described here) might still be the predominant NoSQL databases today, HBase is gaining in popularity and may well be the leader in the near future.

Tutorial Links

The folks at Coreservlets.com have put together a handful of Hadoop tutorials including an excellent series on HBase. There’s also a handful of video tutorials available on the Internet, including this one, which we found particularly helpful.

Example Code

In this example, your goal is to find the average review for the movie Dune. Each movie review has three elements: a reviewer name, a film title, and a rating (an integer from 0 to 10). The example is done in the HBase shell:

hbase(main):008:0> create 'reviews', 'cf1'

0 row(s) in 1.0710 seconds

hbase(main):013:0> put 'reviews', 'dune-marshall', \

hbase(main):014:0> 'cf1:score', 1

0 row(s) in 0.0370 seconds

hbase(main):015:0> put 'reviews', 'dune-kevin', \

hbase(main):016:0> 'cf1:score', 10

0 row(s) in 0.0090 seconds

hbase(main):017:0> put 'reviews', 'casablanca-kevin', \

hbase(main):018:0> 'cf1:score', 5

0 row(s) in 0.0130 seconds

hbase(main):019:0> put 'reviews', 'blazingsaddles-b0b', \

hbase(main):020:0> 'cf1:score', 9

0 row(s) in 0.0090 seconds

hbase(main):021:0> scan 'reviews'

ROW                          COLUMN+CELL

 blazingsaddles-b0b          column=cf1:score,

                             timestamp=1390598651108,

                             value=9

 casablanca-kevin            column=cf1:score,

                             timestamp=1390598627889,

                             value=5

 dune-kevin                  column=cf1:score,

                             timestamp=1390598600034,

                             value=10

 dune-marshall               column=cf1:score,

                             timestamp=1390598579439,

                             value=1

3 row(s) in 0.0290 seconds

hbase(main):024:0> scan 'reviews', {STARTROW => 'dune', \

hbase(main):025:0> ENDROW => 'dunf'}

ROW                          COLUMN+CELL

 dune-kevin                  column=cf1:score,

                             timestamp=1390598791384,

                             value=10

 dune-marshall               column=cf1:score,

                             timestamp=1390598579439,

                             value=1

2 row(s) in 0.0090 seconds

Now you’ve retrieved the two rows using an efficient range scan, but how do you compute the average? In the HBase shell, it’s not possible; using the HBase Java APIs, you can extract the values, but there is no built-in row aggregation function for average or sum, so you would need to do this in your Java code.

The choice of the row key is critical in HBase. If you want to find the average rating of all the movies Kevin has reviewed, you would need to do a full table scan, potentially a very tedious task with a very large dataset. You might want to have two versions of the table, one with the row key given by reviewer-film and another with film-reviewer. Then you would have the problem of ensuring they’re in sync.

Accumulo

fgth 02in04

License

Apache License, Version 2.0

Activity

High

Purpose

Name-value database with cell-level security

Official Page

http://accumulo.apache.org/index.html

Hadoop Integration

Fully Integrated

You have an application that could use a good column/name-value store, like HBase (described here), but you have an additional security issue; you must carefully control which users can see which cells in your data. For example, you could have a multitenancy data store in which you are storing data from different divisions in your enterprise in a single table and want to ensure that users from one division cannot see the data from another, but that senior management can see across the whole enterprise. For internal security reasons, the U.S. National Security Agency (NSA) developed Accumulo and then donated the code to the Apache foundation.

You might notice a great deal of similarity between HBase and Accumulo, as both systems are modeled on Google’s BigTable. Accumulo improves on that model with its focus on security and cell-based access control. Each user has a set of security labels, simple text strings. Suppose yours were “admin,” “audit,” and “GroupW.” When you want to define the access to a particular cell, you set the column visibility for that column in a given row to a Boolean expression of the various labels. In this syntax, the & is logical AND and | is logical OR. If the cell’s visibility rule were admin|audit, then any user with either admin or audit label could see that cell. If the column visibillity rule were admin&Group7, you would not be able to see it, as you lack the Group7 label, and both are required.

But Accumulo is more than just security. It also can run at massive scale, with many petabytes of data with hundreds of thousands of ingest and retrieval operations per second.

Tutorial Links

For more information on Accumulo, check out the following resources:

§  An introduction from Aaron Cordova, one of the originators of Accumulo.

§  A video tutorial that focuses on performance and the Accumulo architecture.

§  This tutorial is more focused on security and encryption.

§  The 2014 Accumulo Summit has a wealth of information.

Example Code

Good example code is a bit long and complex to include here, but can be found on the “Examples” section of the project’s home page.

Memcached

fgth 02in05

License

Revised BSD License

Activity

Medium

Purpose

In-Memory Cache

Official Page

http://memcached.org

Hadoop Integration

No Integration

It’s entirely likely you will eventually encounter a situation where you need very fast access to a large amount of data for a short period of time. For example, let’s say you want to send an email to your customers and prospects letting them know about new features you’ve added to your product, but you also need to make certain you exclude folks you’ve already contacted this month.

The way you’d typically address this query in a big data system is by distributing your large contact list across many machines, and then loading the entirety of your list of folks contacted this month into memory on each machine and quickly checking each contact against your list of those you’ve already emailed. In MapReduce, this is often referred to as a “replicated join.” However, let’s assume you’ve got a large network of contacts consisting of many millions of email addresses you’ve collected from trade shows, product demos, and social media, and you like to contact these people fairly often. This means your list of folks you’ve already contacted this month could be fairly large and the entire list might not fit into the amount of memory you’ve got available on each machine.

What you really need is some way to pool memory across all your machines and let everyone refer back to that large pool. Memcached is a tool that lets you build such a distributed memory pool. To follow up on our previous example, you would store the entire list of folks who’ve already been emailed into your distributed memory pool and instruct all the different machines processing your full contact list to refer back to that memory pool instead of local memory.

Tutorial Links

The spymemcached project has a handful of examples using its API available on its wiki.

Example Code

Let’s say we need to keep track of which reviewers have already reviewed which movies, so we don’t ask a reviewer to review the same movie twice. Because there is no single, officially supported Java client for Memcached, we’ll use the popular spymemcached client.

We’ll start by defining a client and pointing it at our Memcached servers:

MemcachedClient client = new MemcachedClient(

    AddrUtil.getAddresses("server1:11211 server2:11211"));

Now we’ll start loading data into our cache. We’ll use the popular OpenCSV library to read our reviews file and write an entry to our cache for every reviewer and title pair we find:

CSVReader reader = new CSVReader(new FileReader("reviews.csv"));

String [] line;

while ((line = reader.readNext()) != null) {

    //Merge the reviewer name and the movie title

    //into a single value (ie: KevinDune)

    //that we'll use as a key

    String reviewerAndTitle = line[0] + line[1];

    //Write the key to our cache and store it for 30 minutes

    //(188 seconds)

    client.set(reviewerAndTitle, 1800, true);

}

Once we have our values loaded into the cache, we can quickly check the cache from a MapReduce job or any other Java code:

Object myObject=client.get(aKey);

Blur

fgth 02in06

License

Apache License, Version 2.0

Activity

Medium

Purpose

Document Warehouse

Official Page

https://incubator.apache.org/blur

Hadoop Integration

Fully Integrated

Let’s say you’ve bought in to the entire big data story using Hadoop. You’ve got Flume gathering data and pushing it into HDFS, your MapReduce jobs are transforming that data and building key-value pairs that are pushed into HBase, and you even have a couple enterprising data scientists using Mahout to analyze your data. At this point, your CTO walks up to you and asks how often one of your specific products is mentioned in a feedback form your are collecting from your users. Your heart drops as you realize the feedback is free-form text and you’ve got no way to search any of that data.

Blur is a tool for indexing and searching text with Hadoop. Because it has Lucene (a very popular text-indexing framework) at its core, it has many useful features, including fuzzy matching, wildcard searches, and paged results. It allows you to search through unstructured data in a way that would otherwise be very difficult.

Tutorial Links

You can’t go wrong with the official “getting started” guide on the project home page. There is also an excellent, though slightly out of date, presentation from a Hadoop User Group meeting in 2011.

Example Code

There are a couple different ways to load data into Blur. When you have large amounts of data you want to index in bulk, you will likely use MapReduce, whereas if you want to stream data in, you are likely better off with the mutation interface. In this case, we’re going to use the mutation interface, as we’re just going to index a couple records:

import static org.apache.blur.thrift.util.BlurThriftHelper.*;

Iface aClient = BlurClient.getClient(

    "controller1:40010,controller2:40010");

//Create a new Row in table 1

RowMutation mutation1 = newRowMutation("reviews", "Dune",

    newRecordMutation("review", "review_1.json",

        newColumn("Reviewer", "Kevin"),

        newColumn("Rating", "10")

        newColumn(

            "Text",

            "I was taken away with the movie's greatness!")

    ),

    newRecordMutation("review", "review_2.json",

        newColumn("Reviewer", "Marshall"),

        newColumn("Rating", "1")

        newColumn(

            "Text",

            "I thought the movie was pretty terrible :(")

    )

);

client.mutate(mutation);

Now let’s say we want to search for all reviews where the review text mentions something being great. We’re going to pull up the Blur shell by running /bin/blur shell from our installation directory and run a simple query. This query tells Blur to look in the “Text” column of the review column family in the reviews table for anything that looks like the word “great”:

blur> query reviews review.Text:great

 - Results Summary -

    total : 1

    time : 41.372 ms

--------------------------------------------------------------

      hit : 0

    score : 0.9548232184568715

       id : Dune

 recordId : review_1.json

   family : review

     Text : I was taken away with the movie's greatness!

--------------------------------------------------------------

 - Results Summary -

    total : 1

    time  : 41.372 ms

Solr

fgth 02in07

License

Apache License, Version 2.0

Activity

High

Purpose

Document Warehouse

Official Page

https://lucene.apache.org/solr

Hadoop Integration

API Compatible

Sometimes you just want to search through a big stack of documents. Not all tasks require big, complex analysis jobs spanning petabytes of data. For many common use cases, you may find that you have too much data for a simple Unix grep or Windows search, but not quite enough to warrant a team of data scientists. Solr fits comfortably in that middle ground, providing an easy-to-use means to quickly index and search the contents of many documents.

Solr supports a distributed architecture that provides many of the benefits you expect from big data systems (e.g., linear scalability, data replication, and failover). It is based on Lucene, a popular framework for indexing and searching documents, and implements that framework by providing a set of tools for building indexes and querying data.

While Solr is able to use the Hadoop Distributed File System (HDFS; described here) to store data, it is not truly compatible with Hadoop and does not use MapReduce (described here) or YARN (described here) to build indexes or respond to queries. There is a similar effort named Blur (described here) to build a tool on top of the Lucene framework that leverages the entire Hadoop stack.

Tutorial Links

Apart from the tutorial on the official Solr home page, there is a Solr wiki with great information.

Example Code

In this example, we’re going to assume we have a set of semi-structured data consisting of movie reviews with labels that clearly mark the title and the text of the review. These reviews will be stored in individual JSON files in the reviews directory.

We’ll start by telling Solr to index our data; there are a handful of different ways to do this, all with unique trade-offs. In this case, we’re going to use the simplest mechanism, which is the post.sh script located in the exampledocs/ subdirectory of our Solr install:

./example/exampledocs/post.sh /reviews/*.json

Once our reviews have been indexed, they are ready to search. Solr has its own graphical user interface (GUI) that can be used for simple searches. We’ll pull up that GUI and search for movie reviews that contain the word “great”:

review_text:great&fl=title

This search tells Solr that we want to retrieve the title field (fl=title) for any review where the word “great” appears in the review_text field.

MongoDB

fgth 02in08

License

Free Software Foundation’s GNU AGPL v3.0.; commercial licenses available from MongoDB, Inc.

Activity

High

Purpose

JSON document-oriented database

Official Page

http://www.mongodb.org

Hadoop Integration

API Compatible

If you have a large number of JSON documents (described here) in your Hadoop cluster and need some data management tool to effectively use them, consider MongoDB, an open source, big data, document-oriented database whose documents are JSON objects. At the start of 2015, it is one of the most popular NoSQL databases. Unlike some other database systems, MongoDB supports secondary indexes—meaning it is possible to quickly search on other than the primary key that uniquely identifies each document in the Mongo database. The name derives from the slang word “humongous,” meaning very, very large. While MongoDB did not originally run on Hadoop and the HDFS, it can be used in conjunction with Hadoop.

MongoDB is a document-oriented database, the document being a JSON object. In relational databases, you have tables and rows. In MongoDB, the equivalent of a row is a JSON document, and the analog to a table is a collection, a set of JSON documents. To understand MongoDB, you should skip ahead to “JSON” of this book.

Perhaps the best way to understand its use is by way of a code example, shown in the next “Example Code” section.

Tutorial Links

The tutorials section on the official project page is a great place to get started. There are also plenty of videos available on the Internet, including this informative series.

Example Code

This time you’ll want to compute the average ranking of the movie Dune in the standard dataset. If you know Python, this will be clear. If you don’t, the code is still pretty straightforward:

#!/usr/bin/python

# import required packages

import sys

import pymongo

# json movie reviews

movieReviews = [

    { "reviewer":"Kevin", "movie":"Dune", "rating","10" },

    { "reviewer":"Marshall", "movie":"Dune", "rating","1" },

    { "reviewer":"Kevin", "movie":"Casablanca", "rating","5" },

    { "reviewer":"Bob", "movie":"Blazing Saddles", "rating","9" }

]

# MongoDB connection info

MONGODB_INFO = 'mongodb://juser:password@localhost:27018/db'

# connect to MongoDB

client=pymongo.MongoClient(MONGODB_INFO)

db=client.get_defalut_database()

# create the movies collection

movies=db['movies']

#insert the movie reviews

movies.insert(movieReviews)

# find all the movies with title Dune, iterate through them

# finding all scores by using

# standard db cursor technology

mcur=movies.find({'movie': {'movie': 'Dune'})

count=0

sum=0

# for all reviews of Dune, count them up and sum the rankings

for m inmcur:

    count += 1

    sum += m['rating']

client.close()

rank=float(sum)/float(count)

print ('Dune %s\n' % rank)

Hive

fgth 02in09

License

Apache License, Version 2.0

Activity

High

Purpose

Data Interaction

Official Page

http://hive.apache.org

Integration

Fully Integrated

At first, all access to data in your Hadoop cluster came through MapReduce jobs written in Java. This worked fine during Hadoop’s infancy when all Hadoop users had a stable of Java-savvy coders. However, as Hadoop emerged into the broader world, many wanted to adopt Hadoop but had stables of SQL coders for whom writing MapReduce would be a steep learning curve. Enter Hive. The goal of Hive is to allow SQL access to data in the HDFS. The Apache Hive data-warehouse software facilitates querying and managing large datasets residing in HDFS. Hive defines a simple SQL-like query language, called HQL, that enables users familiar with SQL to query the data. Queries written in HQL are converted into MapReduce code by Hive and executed by Hadoop. But beware! HQL is not full ANSI-standard SQL. While the basics are covered, some features are missing. Here’s a partial list as of early 2015:

§  Hive does not support non-equality join conditions.

§  Update and delete statements are not supported.

§  Transactions are not supported.

You may not need these, but if you run code generated by third-party solutions, they may generate non-Hive compliant code.

Hive does not mandate read or written data be in the “Hive format”—there is no such thing. This means your data can be accessed directly by Hive without any of the extract, transform, and load (ETL) preprocessing typically required by traditional relational databases.

Tutorial Links

A couple of great resources are the official Hive tutorial and this video published by the folks at HortonWorks.

Example Code

Say we have a comma-separated values (CSV) file containing movie reviews with information about the reviewer, the movie, and the rating:

Kevin,Dune,10

Marshall,Dune,1

Kevin,Casablanca,5

Bob,Blazing Saddles,9

First, we need to define the schema for our data:

CREATE TABLE movie_reviews

    ( reviewer STRING, title STRING, rating INT)

ROW FORMAT DELIMITED

FILEDS TERMINATED BY ‘\,’

STORED AS TEXTFILE

Next, we need to load the data by pointing the table at our movie reviews file. Because Hive doesn’t require that data be stored in any specific format, loading a table consists simply of pointing Hive at a file in HDFS:

LOAD DATA LOCAL INPATH ‘reviews.csv’

OVERWRITE INTO TABLE movie_reviews

Now we are ready to perform some sort of analysis. Let’s say, in this case, we want to find the average rating for the movie Dune:

Select AVG(rating) FROM movie_reviews WHERE title = ‘Dune’;

Spark SQL (formerly Shark)

fgth 02in10

License

Apache License, Version 2.0

Activity

High

Purpose

SQL access to Hadoop Data

Official Page

http://spark.apache.org/sql/

Hadoop Integration

API Compatible

If you need SQL access to your data, and Hive (described here) is a bit underperforming, and you’re willing to commit to a Spark environment (described here), then you need to consider Spark SQL. SQL access in Spark was originally called the Shark project, and was a port of Hive, but Shark has ceased development and its successor, Spark SQL, is now the mainline SQL project on Spark. The blog post “Shark, Spark SQL, Hive on Spark, and the Future of SQL on Spark” provides more information about the change. Spark SQL, like Spark, has an in-memory computing model, which helps to account for its speed. It’s only in recent years that decreasing memory costs have made large memory Linux servers ubiquitous, thus leading to recent advances in in-memory computing for large datasets. Because memory access times are usually 100 times as fast as disk access times, it’s quite appealing to keep as much in memory as possible, using the disks as infrequently as possible. But abandoning MapReduce has made Spark SQL much faster, even if it requires disk access.

While Spark SQL speaks HQL, the Hive query language, it has a few extra features that aren’t in Hive. One is the ability to encache table data for the duration of a user session. This corresponds to temporary tables in many other databases, but unlike other databases, these tables live in memory and are thus accessed much faster. Spark SQL also allows access to tables as though they were Spark Resilient Distributed Datasets (RDD).

Spark SQL supports the Hive metastore, most of its query language, and data formats, so existing Hive users should have an easier time converting to Shark than many others. However, while the Spark SQL documentation is currently not absolutely clear on this, not all the Hive features have yet been implemented in Spark SQL. APIs currently exist for Python, Java, and Scala. See “Hive” for more details. Spark SQL also can run Spark’s MLlib machine-learning algorithms as SQL statements.

Spark SQL can use JSON (described here) and Parquet (described here) as data sources, so it’s pretty useful in an HDFS environment.

Tutorial Links

There are a wealth of tutorials on the project home page.

Example Code

At the user level, Shark looks like Hive, so if you can code in Hive, you can almost code in Spark SQL. But you need to set up your Spark SQL environment. Here’s how you would do it in Python using the movie review data we use in other examples (to understand the setup, you’ll need to read “Spark”, as well as have some knowledge of Python):

# Spark requires a Context object.  Let's assume it exists

# already. You need a SQL Context object as well

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

# Load a the CSV text file and convert each line to a Python

# dictionary using lambda notation for anonymous functions.

lines = sc.textFile("reviews.csv")

movies = lines.map(lambda l: l.split(","))

reviews = movies.map(

    lambda p: {"name": p[0], "title": p[1], "rating": int(p[2])})

# Spark SQL needs to think of the RDD

# (Resilient Distributed Dataset) as a data schema

# and register the table name

schemaReviews = sqlContext.inferSchema(reviews)

schemaReviews.registerAsTable("reviews")

# once you've registered the RDD as a schema,

# you can run SQL statements over it.

dune_reviews = sqlContext.sql(

    "SELECT * FROM reviews WHERE title = 'Dune'")

Giraph

fgth 02in11

License

Apache License, Version 2.0

Activity

High

Purpose

Graph database

Official Page

https://giraph.apache.org

Hadoop Integration

Fully Integrated

You may know a parlor game called Six Degrees of Separation from Kevin Bacon in which movie trivia experts try to find the closest relationship between a movie actor and Kevin Bacon. If an actor is in the same movie, that’s a “path” of length 1. If an actor has never been in a movie with Kevin Bacon, but has been in a movie with an actor who has been, that’s a path of length 2. It rests on the assumption that any individual involved in the film industry can be linked through his or her film roles to Kevin Bacon within six steps, or six degrees of separation. For example, there is an arc between Kevin Bacon and Sean Penn, because they were both in Mystic River, so they have one degree of separation or a path of length 1. But Benicio Del Toro has a path of length 2 because he has never been in a movie with Kevin Bacon, but has been in one with Sean Penn.

You can show these relationships by means of a graph, a set of ordered pairs (N,M) which describe a connection from N to M.

You can think of a tree (such as a hierarchical filesystem) as a graph with a single source node or origin, and arcs leading down the tree branches. The set {(top, b1), (top, b2), (b1,c1), (b1,c2), (b2,c3)} is a tree rooted at top, with branches from top to b1 and b2, b1 to c1 and c2, and b2 to c3. The elements of the set {top, b1, b2, c1,c2,c3} are called the nodes.

You will find graphs useful in describing relationships between entities. For example, if you had a collection of emails sent between people in your organization, you could build a graph where each node represents a person in your organization and an arc would exist between node a and node b if a sent an email to b or vice versa. It could look like Figure 2-2.

fgth 02in12

Figure 2-2. A graph detailing email relationships between people

Giraph is an Apache project to build and extract information from graphs. For example, you could use Giraph to calculate the shortest distance (number of arc hops) from one node in the graph to another or to calculate if there was a path between two nodes.

Apache Giraph is derived from a Google project called Pregel and has been used by Facebook to build and analyze a graph with a trillion nodes, admittedly on a very large Hadoop cluster. It is built using a technology called Bulk Synchronous Parallel (BSP).

The general notion is that there are a set of “supersteps” in the BSP model. In step zero, the vertices or nodes are distributed to worker processes. In each following superstep, each of the vertices iterates through a set of messages it received from the previous superset and sends messages to other nodes to which it is connected.

In the Kevin Bacon example, each node represents an actor, director, producer, screenwriter, and so on. Each arc connects two people who are part of the same movie. And we want to test the hypothesis that everyone in the industry is connected to Kevin Bacon within six hops in the graph. Each node is given an initial value to the number of hops; for Kevin Bacon, it is zero. For everyone else, the initial value is a very large integer. At the first superstep, each node sends its value to all those nodes connected to it. Then, at each of the other supersteps, each node first reads all its messages and takes the minimum value. If it is less than its current value, the node adds 1 and then sends this to all its connected nodes at the end of the superstep. Why? Because if a connected node is N steps from Kevin, then this node is at mosts N+1 steps away. Once a node has established a new value, it opts out of sending more messages.

At the end of six supersteps, you’ll have all the persons connected to Kevin Bacon by six or fewer hops.

Tutorial Links

The official product page has a quick start guide. In addition, there are a handful of videotaped talks, including one by PayPal and another by Facebook. Finally, there’s this particularly informative blog post.