Hadoop: The Definitive Guide (2015)

Part IV. Related Projects

Chapter 13. Parquet

Apache Parquet is a columnar storage format that can efficiently store nested data.

Columnar formats are attractive since they enable greater efficiency, in terms of both file size and query performance. File sizes are usually smaller than row-oriented equivalents since in a columnar format the values from one column are stored next to each other, which usually allows a very efficient encoding. A column storing a timestamp, for example, can be encoded by storing the first value and the differences between subsequent values (which tend to be small due to temporal locality: records from around the same time are stored next to each other). Query performance is improved too since a query engine can skip over columns that are not needed to answer a query. (This idea is illustrated in Figure 5-4.) This chapter looks at Parquet in more depth, but there are other columnar formats that work with Hadoop — notably ORCFile (Optimized Record Columnar File), which is a part of the Hive project.

A key strength of Parquet is its ability to store data that has a deeply nested structure in true columnar fashion. This is important since schemas with several levels of nesting are common in real-world systems. Parquet uses a novel technique for storing nested structures in a flat columnar format with little overhead, which was introduced by Google engineers in the Dremel paper.[86] The result is that even nested fields can be read independently of other fields, resulting in significant performance improvements.

Another feature of Parquet is the large number of tools that support it as a format. The engineers at Twitter and Cloudera who created Parquet wanted it to be easy to try new tools to process existing data, so to facilitate this they divided the project into a specification (parquet-format), which defines the file format in a language-neutral way, and implementations of the specification for different languages (Java and C++) that made it easy for tools to read or write Parquet files. In fact, most of the data processing components covered in this book understand the Parquet format (MapReduce, Pig, Hive, Cascading, Crunch, and Spark). This flexibility also extends to the in-memory representation: the Java implementation is not tied to a single representation, so you can use in-memory data models for Avro, Thrift, or Protocol Buffers to read your data from and write it to Parquet files.

Data Model

Parquet defines a small number of primitive types, listed in Table 13-1.

Table 13-1. Parquet primitive types

Type

Description

boolean

Binary value

int32

32-bit signed integer

int64

64-bit signed integer

int96

96-bit signed integer

float

Single-precision (32-bit) IEEE 754 floating-point number

double

Double-precision (64-bit) IEEE 754 floating-point number

binary

Sequence of 8-bit unsigned bytes

fixed_len_byte_array

Fixed number of 8-bit unsigned bytes

The data stored in a Parquet file is described by a schema, which has at its root a message containing a group of fields. Each field has a repetition (required, optional, or repeated), a type, and a name. Here is a simple Parquet schema for a weather record:

message WeatherRecord {

  required int32 year;

  required int32 temperature;

  required binary stationId (UTF8);

}

Notice that there is no primitive string type. Instead, Parquet defines logical types that specify how primitive types should be interpreted, so there is a separation between the serialized representation (the primitive type) and the semantics that are specific to the application (the logical type). Strings are represented as binary primitives with a UTF8 annotation. Some of the logical types defined by Parquet are listed in Table 13-2, along with a representative example schema of each. Among those not listed in the table are signed integers, unsigned integers, more date/time types, and JSON and BSON document types. See the Parquet specification for details.

Table 13-2. Parquet logical types

Logical type annotation

Description

Schema example

UTF8

A UTF-8 character string. Annotates binary.

message m {

  required binary a (UTF8);

}

ENUM

A set of named values. Annotates binary.

message m {

  required binary a (ENUM);

}

DECIMAL(precision,scale)

An arbitrary-precision signed decimal number. Annotates int32, int64, binary, or fixed_len_byte_array.

message m {

  required int32 a (DECIMAL(5,2));

}

DATE

A date with no time value. Annotates int32. Represented by the number of days since the Unix epoch (January 1, 1970).

message m {

  required int32 a (DATE);

}

LIST

An ordered collection of values. Annotates group.

message m {

  required group a (LIST) {

    repeated group list {

      required int32 element;

    }

  }

}

MAP

An unordered collection of key-value pairs. Annotates group.

message m {

  required group a (MAP) {

    repeated group key_value {

      required binary key (UTF8);

      optional int32 value;

    }

  }

}

Complex types in Parquet are created using the group type, which adds a layer of nesting.[87] A group with no annotation is simply a nested record.

Lists and maps are built from groups with a particular two-level group structure, as shown in Table 13-2. A list is represented as a LIST group with a nested repeating group (called list) that contains an element field. In this example, a list of 32-bit integers has a required int32element field. For maps, the outer group a (annotated MAP) contains an inner repeating group key_value that contains the key and value fields. In this example, the values have been marked optional so that it’s possible to have null values in the map.

Nested Encoding

In a column-oriented store, a column’s values are stored together. For a flat table where there is no nesting and no repetition — such as the weather record schema — this is simple enough since each column has the same number of values, making it straightforward to determine which row each value belongs to.

In the general case where there is nesting or repetition — such as the map schema — it is more challenging, since the structure of the nesting needs to be encoded too. Some columnar formats avoid the problem by flattening the structure so that only the top-level columns are stored in column-major fashion (this is the approach that Hive’s RCFile takes, for example). A map with nested columns would be stored in such a way that the keys and values are interleaved, so it would not be possible to read only the keys, say, without also reading the values into memory.

Parquet uses the encoding from Dremel, where every primitive type field in the schema is stored in a separate column, and for each value written, the structure is encoded by means of two integers: the definition level and the repetition level. The details are intricate,[88] but you can think of storing definition and repetition levels like this as a generalization of using a bit field to encode nulls for a flat record, where the non-null values are written one after another.

The upshot of this encoding is that any column (even nested ones) can be read independently of the others. In the case of a Parquet map, for example, the keys can be read without accessing any of the values, which can result in significant performance improvements, especially if the values are large (such as nested records with many fields).

Parquet File Format

A Parquet file consists of a header followed by one or more blocks, terminated by a footer. The header contains only a 4-byte magic number, PAR1, that identifies the file as being in Parquet format, and all the file metadata is stored in the footer. The footer’s metadata includes the format version, the schema, any extra key-value pairs, and metadata for every block in the file. The final two fields in the footer are a 4-byte field encoding the length of the footer metadata, and the magic number again (PAR1).

The consequence of storing the metadata in the footer is that reading a Parquet file requires an initial seek to the end of the file (minus 8 bytes) to read the footer metadata length, then a second seek backward by that length to read the footer metadata. Unlike sequence files and Avro datafiles, where the metadata is stored in the header and sync markers are used to separate blocks, Parquet files don’t need sync markers since the block boundaries are stored in the footer metadata. (This is possible because the metadata is written after all the blocks have been written, so the writer can retain the block boundary positions in memory until the file is closed.) Therefore, Parquet files are splittable, since the blocks can be located after reading the footer and can then be processed in parallel (by MapReduce, for example).

Each block in a Parquet file stores a row group, which is made up of column chunks containing the column data for those rows. The data for each column chunk is written in pages; this is illustrated in Figure 13-1.

The internal structure of a Parquet file

Figure 13-1. The internal structure of a Parquet file

Each page contains values from the same column, making a page a very good candidate for compression since the values are likely to be similar. The first level of compression is achieved through how the values are encoded. The simplest encoding is plain encoding, where values are written in full (e.g., an int32 is written using a 4-byte little-endian representation), but this doesn’t afford any compression in itself.

Parquet also uses more compact encodings, including delta encoding (the difference between values is stored), run-length encoding (sequences of identical values are encoded as a single value and the count), and dictionary encoding (a dictionary of values is built and itself encoded, then values are encoded as integers representing the indexes in the dictionary). In most cases, it also applies techniques such as bit packing to save space by storing several small values in a single byte.

When writing files, Parquet will choose an appropriate encoding automatically, based on the column type. For example, Boolean values will be written using a combination of run-length encoding and bit packing. Most types are encoded using dictionary encoding by default; however, a plain encoding will be used as a fallback if the dictionary becomes too large. The threshold size at which this happens is referred to as the dictionary page size and is the same as the page size by default (so the dictionary has to fit into one page if it is to be used). Note that the encoding that is actually used is stored in the file metadata to ensure that readers use the correct encoding.

In addition to the encoding, a second level of compression can be applied using a standard compression algorithm on the encoded page bytes. By default, no compression is applied, but Snappy, gzip, and LZO compressors are all supported.

For nested data, each page will also store the definition and repetition levels for all the values in the page. Since levels are small integers (the maximum is determined by the amount of nesting specified in the schema), they can be very efficiently encoded using a bit-packed run-lengthencoding.

Parquet Configuration

Parquet file properties are set at write time. The properties listed in Table 13-3 are appropriate if you are creating Parquet files from MapReduce (using the formats discussed in Parquet MapReduce), Crunch, Pig, or Hive.

Table 13-3. ParquetOutputFormat properties

Property name

Type

Default value

Description

parquet.block.size

int

134217728 (128 MB)

The size in bytes of a block (row group).

parquet.page.size

int

1048576 (1 MB)

The size in bytes of a page.

parquet.dictionary.page.size

int

1048576 (1 MB)

The maximum allowed size in bytes of a dictionary before falling back to plain encoding for a page.

parquet.enable.dictionary

boolean

true

Whether to use dictionary encoding.

parquet.compression

String

UNCOMPRESSED

The type of compression to use for Parquet files: UNCOMPRESSED, SNAPPY, GZIP, or LZO. Used instead of mapreduce.output.fileoutputformat.compress.

Setting the block size is a trade-off between scanning efficiency and memory usage. Larger blocks are more efficient to scan through since they contain more rows, which improves sequential I/O (as there’s less overhead in setting up each column chunk). However, each block is buffered in memory for both reading and writing, which limits how large blocks can be. The default block size is 128 MB.

The Parquet file block size should be no larger than the HDFS block size for the file so that each Parquet block can be read from a single HDFS block (and therefore from a single datanode). It is common to set them to be the same, and indeed both defaults are for 128 MB block sizes.

A page is the smallest unit of storage in a Parquet file, so retrieving an arbitrary row (with a single column, for the sake of illustration) requires that the page containing the row be decompressed and decoded. Thus, for single-row lookups, it is more efficient to have smaller pages, so there are fewer values to read through before reaching the target value. However, smaller pages incur a higher storage and processing overhead, due to the extra metadata (offsets, dictionaries) resulting from more pages. The default page size is 1 MB.

Writing and Reading Parquet Files

Most of the time Parquet files are processed using higher-level tools like Pig, Hive, or Impala, but sometimes low-level sequential access may be required, which we cover in this section.

Parquet has a pluggable in-memory data model to facilitate integration of the Parquet file format with a wide range of tools and components. ReadSupport and WriteSupport are the integration points in Java, and implementations of these classes do the conversion between the objects used by the tool or component and the objects used to represent each Parquet type in the schema.

To demonstrate, we’ll use a simple in-memory model that comes bundled with Parquet in the parquet.example.data and parquet.example.data.simple packages. Then, in the next section, we’ll use an Avro representation to do the same thing.

NOTE

As the names suggest, the example classes that come with Parquet are an object model for demonstrating how to work with Parquet files; for production, one of the supported frameworks should be used (Avro, Protocol Buffers, or Thrift).

To write a Parquet file, we need to define a Parquet schema, represented by an instance of parquet.schema.MessageType:

MessageType schema = MessageTypeParser.parseMessageType(

    "message Pair {\n" +

    "  required binary left (UTF8);\n" +

    "  required binary right (UTF8);\n" +

    "}");

Next, we need to create an instance of a Parquet message for each record to be written to the file. For the parquet.example.data package, a message is represented by an instance of Group, constructed using a GroupFactory:

GroupFactory groupFactory = new SimpleGroupFactory(schema);

Group group = groupFactory.newGroup()

    .append("left", "L")

    .append("right", "R");

Notice that the values in the message are UTF8 logical types, and Group provides a natural conversion from a Java String for us.

The following snippet of code shows how to create a Parquet file and write a message to it. The write() method would normally be called in a loop to write multiple messages to the file, but this only writes one here:

Configuration conf = new Configuration();

Path path = new Path("data.parquet");

GroupWriteSupport writeSupport = new GroupWriteSupport();

GroupWriteSupport.setSchema(schema, conf);

ParquetWriter<Group> writer = new ParquetWriter<Group>(path, writeSupport,

    ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME,

    ParquetWriter.DEFAULT_BLOCK_SIZE,

    ParquetWriter.DEFAULT_PAGE_SIZE,

    ParquetWriter.DEFAULT_PAGE_SIZE, /* dictionary page size */

    ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,

    ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,

    ParquetProperties.WriterVersion.PARQUET_1_0, conf);

writer.write(group);

writer.close();

The ParquetWriter constructor needs to be provided with a WriteSupport instance, which defines how the message type is translated to Parquet’s types. In this case, we are using the Group message type, so GroupWriteSupport is used. Notice that the Parquet schema is set on theConfiguration object by calling the setSchema() static method on GroupWriteSupport, and then the Configuration object is passed to ParquetWriter. This example also illustrates the Parquet file properties that may be set, corresponding to the ones listed in Table 13-3.

Reading a Parquet file is simpler than writing one, since the schema does not need to be specified as it is stored in the Parquet file. (It is, however, possible to set a read schema to return a subset of the columns in the file, via projection.) Also, there are no file properties to be set since they are set at write time:

GroupReadSupport readSupport = new GroupReadSupport();

ParquetReader<Group> reader = new ParquetReader<Group>(path, readSupport);

ParquetReader has a read() method to read the next message. It returns null when the end of the file is reached:

Group result = reader.read();

assertNotNull(result);

assertThat(result.getString("left", 0), is("L"));

assertThat(result.getString("right", 0), is("R"));

assertNull(reader.read());

Note that the 0 parameter passed to the getString() method specifies the index of the field to retrieve, since fields may have repeated values.

Avro, Protocol Buffers, and Thrift

Most applications will prefer to define models using a framework like Avro, Protocol Buffers, or Thrift, and Parquet caters to all of these cases. Instead of ParquetWriter and ParquetReader, use AvroParquetWriter, ProtoParquetWriter, or ThriftParquetWriter, and the respectivereader classes. These classes take care of translating between Avro, Protocol Buffers, or Thrift schemas and Parquet schemas (as well as performing the equivalent mapping between the framework types and Parquet types), which means you don’t need to deal with Parquet schemas directly.

Let’s repeat the previous example but using the Avro Generic API, just like we did in In-Memory Serialization and Deserialization. The Avro schema is:

{

  "type": "record",

  "name": "StringPair",

  "doc": "A pair of strings.",

  "fields": [

    {"name": "left", "type": "string"},

    {"name": "right", "type": "string"}

  ]

}

We create a schema instance and a generic record with:

Schema.Parser parser = new Schema.Parser();

Schema schema = parser.parse(getClass().getResourceAsStream("StringPair.avsc"));

GenericRecord datum = new GenericData.Record(schema);

datum.put("left", "L");

datum.put("right", "R");

Then we can write a Parquet file:

Path path = new Path("data.parquet");

AvroParquetWriter<GenericRecord> writer =

    new AvroParquetWriter<GenericRecord>(path, schema);

writer.write(datum);

writer.close();

AvroParquetWriter converts the Avro schema into a Parquet schema, and also translates each Avro GenericRecord instance into the corresponding Parquet types to write to the Parquet file. The file is a regular Parquet file — it is identical to the one written in the previous section usingParquetWriter with GroupWriteSupport, except for an extra piece of metadata to store the Avro schema. We can see this by inspecting the file’s metadata using Parquet’s command-line tools:[89]

% parquet-tools meta data.parquet

...

extra:       avro.schema = {"type":"record","name":"StringPair", ...

...

Similarly, to see the Parquet schema that was generated from the Avro schema, we can use the following:

% parquet-tools schema data.parquet

message StringPair {

  required binary left (UTF8);

  required binary right (UTF8);

}

To read the Parquet file back, we use an AvroParquetReader and get back Avro GenericRecord objects:

AvroParquetReader<GenericRecord> reader =

    new AvroParquetReader<GenericRecord>(path);

GenericRecord result = reader.read();

assertNotNull(result);

assertThat(result.get("left").toString(), is("L"));

assertThat(result.get("right").toString(), is("R"));

assertNull(reader.read());

Projection and read schemas

It’s often the case that you only need to read a few columns in the file, and indeed this is the raison d’être of a columnar format like Parquet: to save time and I/O. You can use a projection schema to select the columns to read. For example, the following schema will read only theright field of a StringPair:

{

  "type": "record",

  "name": "StringPair",

  "doc": "The right field of a pair of strings.",

  "fields": [

    {"name": "right", "type": "string"}

  ]

}

In order to use a projection schema, set it on the configuration using the setRequestedProjection() static convenience method on AvroReadSupport:

Schema projectionSchema = parser.parse(

    getClass().getResourceAsStream("ProjectedStringPair.avsc"));

Configuration conf = new Configuration();

AvroReadSupport.setRequestedProjection(conf, projectionSchema);

Then pass the configuration into the constructor for AvroParquetReader:

AvroParquetReader<GenericRecord> reader =

    new AvroParquetReader<GenericRecord>(conf, path);

GenericRecord result = reader.read();

assertNull(result.get("left"));

assertThat(result.get("right").toString(), is("R"));

Both the Protocol Buffers and Thrift implementations support projection in a similar manner. In addition, the Avro implementation allows you to specify a reader’s schema by calling setReadSchema() on AvroReadSupport. This schema is used to resolve Avro records according to the rules listed in Table 12-4.

The reason that Avro has both a projection schema and a reader’s schema is that the projection must be a subset of the schema used to write the Parquet file, so it cannot be used to evolve a schema by adding new fields.

The two schemas serve different purposes, and you can use both together. The projection schema is used to filter the columns to read from the Parquet file. Although it is expressed as an Avro schema, it can be viewed simply as a list of Parquet columns to read back. The reader’s schema, on the other hand, is used only to resolve Avro records. It is never translated to a Parquet schema, since it has no bearing on which columns are read from the Parquet file. For example, if we added a description field to our Avro schema (like in Schema Resolution) and used it as the Avro reader’s schema, then the records would contain the default value of the field, even though the Parquet file has no such field.

Parquet MapReduce

Parquet comes with a selection of MapReduce input and output formats for reading and writing Parquet files from MapReduce jobs, including ones for working with Avro, Protocol Buffers, and Thrift schemas and data.

The program in Example 13-1 is a map-only job that reads text files and writes Parquet files where each record is the line’s offset in the file (represented by an int64 — converted from a long in Avro) and the line itself (a string). It uses the Avro Generic API for its in-memory data model.

Example 13-1. MapReduce program to convert text files to Parquet files using AvroParquetOutputFormat

public class TextToParquetWithAvro extends Configured implements Tool {

  private static final Schema SCHEMA = new Schema.Parser().parse(

      "{\n" +

      "  \"type\": \"record\",\n" +

      "  \"name\": \"Line\",\n" +

      "  \"fields\": [\n" +

      "    {\"name\": \"offset\", \"type\": \"long\"},\n" +

      "    {\"name\": \"line\", \"type\": \"string\"}\n" +

      "  ]\n" +

      "}");

  public static class TextToParquetMapper

      extends Mapper<LongWritable, Text, Void, GenericRecord> {

    private GenericRecord record = new GenericData.Record(SCHEMA);

    @Override

    protected void map(LongWritable key, Text value, Context context)

        throws IOException, InterruptedException {

      record.put("offset", key.get());

      record.put("line", value.toString());

      context.write(null, record);

    }

  }

  @Override

  public int run(String[] args) throws Exception {

    if (args.length != 2) {

      System.err.printf("Usage: %s [generic options] <input> <output>\n",

          getClass().getSimpleName());

      ToolRunner.printGenericCommandUsage(System.err);

      return -1;

    }

    Job job = new Job(getConf(), "Text to Parquet");

    job.setJarByClass(getClass());

    FileInputFormat.addInputPath(job, new Path(args[0]));

    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.setMapperClass(TextToParquetMapper.class);

    job.setNumReduceTasks(0);

    job.setOutputFormatClass(AvroParquetOutputFormat.class);

    AvroParquetOutputFormat.setSchema(job, SCHEMA);

    job.setOutputKeyClass(Void.class);

    job.setOutputValueClass(Group.class);

    return job.waitForCompletion(true) ? 0 : 1;

  }

  public static void main(String[] args) throws Exception {

    int exitCode = ToolRunner.run(new TextToParquetWithAvro(), args);

    System.exit(exitCode);

  }

}

The job’s output format is set to AvroParquetOutputFormat, and the output key and value types are set to Void and GenericRecord to match, since we are using Avro’s Generic API. Void simply means that the key is always set to null.

Like AvroParquetWriter from the previous section, AvroParquetOutputFormat converts the Avro schema to a Parquet schema automatically. The Avro schema is set on the Job instance so that the MapReduce tasks can find the schema when writing the files.

The mapper is straightforward; it takes the file offset (key) and line (value) and builds an Avro GenericRecord object with them, which it writes out to the MapReduce context object as the value (the key is always null). AvroParquetOutputFormat takes care of the conversion of the Avro GenericRecord to the Parquet file format encoding.

WARNING

Parquet is a columnar format, so it buffers rows in memory. Even though the mapper in this example just passes values through, it must have sufficient memory for the Parquet writer to buffer each block (row group), which is by default 128 MB. If you get job failures due to out of memory errors, you can adjust the Parquet file block size for the writer with parquet.block.size(see Table 13-3). You may also need to change the MapReduce task memory allocation (when reading or writing) using the settings discussed in Memory settings in YARN and MapReduce.

The following command runs the program on the four-line text file quangle.txt:

% hadoop jar parquet-examples.jar TextToParquetWithAvro \

  input/docs/quangle.txt output

We can use the Parquet command-line tools to dump the output Parquet file for inspection:

% parquet-tools dump output/part-m-00000.parquet

INT64 offset

--------------------------------------------------------------------------------

*** row group 1 of 1, values 1 to 4 ***

value 1: R:0 D:0 V:0

value 2: R:0 D:0 V:33

value 3: R:0 D:0 V:57

value 4: R:0 D:0 V:89

BINARY line

--------------------------------------------------------------------------------

*** row group 1 of 1, values 1 to 4 ***

value 1: R:0 D:0 V:On the top of the Crumpetty Tree

value 2: R:0 D:0 V:The Quangle Wangle sat,

value 3: R:0 D:0 V:But his face you could not see,

value 4: R:0 D:0 V:On account of his Beaver Hat.

Notice how the values within a row group are shown together. V indicates the value, R the repetition level, and D the definition level. For this schema, the latter two are zero since there is no nesting.

[86Sergey Melnik et al., Dremel: Interactive Analysis of Web-Scale Datasets, Proceedings of the 36th International Conference on Very Large Data Bases, 2010.

[87This is based on the model used in Protocol Buffers, where groups are used to define complex types like lists and maps.

[88Julien Le Dem’s exposition is excellent.

[89The Parquet tools can be downloaded as a binary tarball from the Parquet Maven repository. Search for “parquet-tools” on http://search.maven.org.