Pig Design Patterns (2014)

Chapter 5. Data Transformation Patterns

In the last chapter, you learned about various patterns related to data validation and cleansing from which you understood that there are ways to detect and remove incorrect or inaccurate records from the data. By the time data validation and cleansing is complete, the inconsistencies in the data are identified even before the data is used in the next steps of the analytics life cycle; then, the inconsistent data is replaced, modified, or deleted to make it more consistent.

In this chapter, you will learn about various design patterns related to data transformation, such as structured to hierarchical, normalization, integration, aggregation, and generalization design patterns.

Data transformation processes

The process of data transformation is one of the fundamental building blocks and a vital step in the knowledge discovery process of Big Data analytics. Data transformation is an iterative process that modifies the source data into a format that enables the analytics algorithms to be applied effectively. Transformation improves the performance and accuracy of the algorithms by ensuring that the data is stored and retrieved in a format that is conducive for applying the analytics. This is done by improving the overall quality of the source data.

Data transformation for Big Data predominantly consists of the following major processes:

·        Normalization: This transformation scales the attribute data to bring it within a specified range. Typically, an attribute value is transformed to fit the range between 0 and 1. This is done to remove certain unwanted effects of certain attributes on the analytics. The normalization transformation is different from the first, second, and third normal form used in the relational database design.

·        Aggregation: This transformation applies summary operations on the data, for example, computing monthly and yearly summaries from the daily stock data, thus creating a data cube for performing analysis in multiple dimensions and granularities.

·        Generalization: In this transformation, the low-level raw data is replaced by higher level abstractions using concept hierarchies. For example, low-level data such as street can be replaced by higher abstractions such as city or state depending on the analytics use case.

·        Data integration: It is the process of joining the data from multiple input pipelines of similar or dissimilar structure into a single output pipe.

The following sections elaborate the most commonly used Pig design patterns that help in data transformation.

The structured-to-hierarchical transformation pattern

The structured-to-hierarchical transformation pattern deals with transforming the data by generating a hierarchical structure, such as XML or JSON from the structured data.

Background

The structured-to-hierarchical transformation pattern creates a new hierarchical structure, such as JSON or XML out of data that is stored in flat, row-like structures. It is a data transformation pattern that creates new records, which are represented in a different structure when compared to the original records.

Motivation

Hadoop is good at integrating data from multiple sources, but performing joins for the analytics just in time is always a complex and time-consuming operation.

In order to perform certain types of analytics efficiently (such as logfile analysis), the data is sometimes not required to be stored in a normalized form in Hadoop. Storing the data in the normalized form in multiple tables creates an additional step of joining all the data together to perform analytics on it—joins are generally performed on normalized structured data for integrating it from multiple sources that have a foreign-key relationship.

Instead, raw data is nested in a hierarchical fashion by denormalizing them. This pre-processing of data will ensure that the analysis is performed efficiently.

NoSQL databases, such as HBase, Cassandra, or MongoDB facilitate the storage of flat data in column families or JSON objects. Hadoop can be used to perform the task of integrating data from multiple sources in a batch mode and creating hierarchical data structures that could be readily inserted into these databases.

Use cases

This design pattern is predominantly applicable to integrate structured and row-based data from multiple disjointed data sources. The specific objective of this integration is to convert the data into a hierarchical structure so that the data is ready for analysis.

This pattern is also useful to convert data from a single source to a hierarchical structure, which is then used to load data into columnar and JSON databases.

Pattern implementation

Pig has out-of-the-box support for hierarchical data in the form of tuples and bags that are used to represent nested objects in a single row. The COGROUP operator groups data in one or more relations and creates a nested representation of the output tuples.

This design pattern is implemented in Pig as a standalone script. The script shows the usage of this pattern by generating a hierarchical representation of data from a structured format. The script loads a denormalized CSV file and passes it to a custom Java UDF. The Java UDF uses XMLParser to build an XML file. A custom storage function stores the result in the XML format.

Code snippets

To illustrate the working of this pattern, we have considered a manufacturing dataset stored on HDFS. The file production_all.csv contains denormalized data derived from production.csv and manufacturing_units.csv. We will convert the structured data from the CSV format to a hierarchical XML format.

The Pig script for the structured-to-hierarchical design pattern is as follows:

/*

Register the piggybank jar and generateStoreXml jar, it is a custom storage function which generates an XML representation and stores it

*/

REGISTER '/home/cloudera/pdp/jars/generateStoreXml.jar';

REGISTER '/usr/share/pig/contrib/piggybank/java/piggybank.jar';

/*

Load the production dataset into the relation production_details

*/

production_details = LOAD'/user/cloudera/pdp/datasets/data_transformation/production_all.csv' USING  PigStorage(',') AS(production_date,production_hours,manufacturing_unit_id,manufacturing_unit_name,currency,product_id,product_name,quantity_produced);

/*

Call the custom store function TransformStoreXML to transform the contents into a hierarchical representation i.e XML and to store it in the directory structured_to_hierarchical

*/

STORE production_details INTO'/user/cloudera/pdp/output/data_transformation/structured_to_hierarchical' USINGcom.xmlgenerator.TransformStoreXML('production_details','production_data');

The following is a snippet of the Java UDF used by the previous Pig script to perform structured-to-hierarchical transformation:

  /**

   * data from tuple is appended to xml root element

   * @param tuple

   */

  protected void write(Tuple tuple)

  {

    // Retrieving all fields from the schema

    ResourceFieldSchema[] fields = schema.getFields();

    //Retrieve values from tuple

    List<Object> values = tuple.getAll();

    /*Creating xml element by using fields as element tag

and tuple value as element value*/

    Element transactionElement =xmlDoc.createElement(TransformStoreXML.elementName);

    for(int counter=0;counter<fields.length;counter++)

    {

      //Retrieving element value from values

      String columnValue = String.valueOf(values.get(counter));

      //Creating element tag from fields

      Element columnName = xmlDoc.createElement(fields[counter].getName().toString().trim());

      //Appending value to element tag

      columnName.appendChild(xmlDoc.createTextNode(columnValue));

      //Appending element to transaction element

        transactionElement.appendChild(columnName);   

    }

    //Appending transaction element to root element

    rootElement.appendChild(transactionElement);

  }

Results

The following is a snippet of the XML file that is generated as a result of the code being executed on the input:

<?xml version="1.0" encoding="UTF-8" standalone="no" ?>

<production_details>

  <production_data>

    <production_date>2011-01-01T00:00:00</production_date>

    <production_hours>7</production_hours>

    <manufacturing_unit_id>1</manufacturing_unit_id>

    <manufacturing_unit_name>unit1</manufacturing_unit_name>

    <currency>USD</currency>

    <product_id>C001</product_id>

    <product_name>Refrigerator 180L</product_name>

    <quantity_produced>49</quantity_produced>

  </production_data>

  <production_data>

    .

    .

    .

  </production_data>

  .

  .

  .

</production_details>

Additional information

The complete code and datasets for this section are in the following GitHub directories:

·        Chapter5/code/

·        Chapter5/datasets/

The data normalization pattern

The data normalization design pattern discusses ways to perform normalization or standardization of data values.

Background

Normalization of data implies fitting, adjusting, or scaling the data values measured on different scales to a notionally common range. As a simplistic example, joining datasets that consist of different units for distance measurement, such as kilometers and miles, can provide varying results when they are not normalized. Hence, they are normalized to bring them back to one common unit, such as kilometers or miles, so that the effect of different units of measurement is not felt by the analytics.

Motivation

In Big Data, it is common to encounter varying values in the same data attribute while integrating multiple data sources.

Normalization performs data pre-processing and transformation by changing the original data and scaling it to bring it within a specified range (for example, in the range 0 to 1), and assigning equal weight for all attributes. Before normalization is performed on the data, any outliers are removed from it. Data normalization is required in scenarios when analytics can be affected by the choice of the unit of measurement and by values that are in the higher range.

Normalization is used in analytics such as clustering, which is a distance-based method where it prevents attributes with higher values from dominating attributes with lesser values. The techniques to perform normalization on numeric and non numeric data are as follows:

·        Normalizing numeric data: Numeric data is normalized using methods such as min-max normalization, thus transforming the data to a value between a specified range [newMin, newMax]. The minValue and maxValue are usually identified from the dataset and the normalization is done by applying the following formula for each value:

normalizedValue = [((value - minValue) / (maxValue - minValue))*(newMax - newMin) + newMin]

·        Normalizing non-numeric data: Non numeric data is normalized by first converting them into numeric data and then performing the normalizing operation on it. As an example, if the values of a rating attribute can be excellent, very good, good, average, below average, poor, or worst, they can be converted into numeric values of 1 through 7; thus, normalization can be performed on these numeric values to fit in the model.

Use cases

You can consider using this design pattern as a pre-processing technique for performing analytics. This pattern can be used in analytics' use cases to avoid attributes with initially higher values from dwarfing attributes with initially lower values.

This pattern can be considered as a method to encapsulate the original data as it transforms the original data by normalizing it.

Pattern implementation

This design pattern is implemented in Pig as a standalone script. The use case identifies similar manufacturing units for a given product; it demonstrates normalization. The script loads the data and computes total produced quantity and total production hours for each manufacturing unit for the product C001. Each manufacturing unit is represented by product, total produced quantity, and total production hours. The script normalizes total number of units produced and total production hours using the min-max normalization technique to bring all the values to the same scale (range 0 to 1). The script then computes the Euclidean distance between these points. The smaller the distance, the more similar the manufacturing units are.

Code snippets

To illustrate the working of this pattern, we have considered a manufacturing dataset stored on the HDFS. The file production.csv contains production information of each manufacturing unit; this file contains attributes such as production_date, production_hours,manufacturing_unit_id, product_id, and produced_quantity. We will be calculating total produced quantity and total production hours for each manufacturing unit for the product C001, as shown in the following code:

/*

Load the production dataset into the relation production

*/

production = LOAD'/user/cloudera/pdp/datasets/data_transformation/production.csv' USING PigStorage(',') AS(production_date:datetime,production_hours:int,manufacturing_unit_id:chararray,product_id:chararray,produced_quantity:int);

/*

Filter the relation products to fetch the records with product id C001

*/

production_filt = FILTER production BY product_id=='C001';

/*

Calculate the total production hours and total produced quantity of product C001 in each manufacturing unit

*/

production_grpd = GROUP production_filt BY(manufacturing_unit_id,product_id);

production_sum = FOREACH production_grpd GENERATE group.$0 ASmanufacturing_unit_id, group.$1 AS product_id,(float)SUM(production_filt.production_hours) ASproduction_hours,(float)SUM(production_filt.produced_quantity)AS produced_quantity;

/*

Apply Min max normalization on total production hours and total produced quantity for each manufacturing unit to scale the data to fit in the range of [0-1]

*/

production_sum_grpd = GROUP production_sum ALL;

production_min_max = FOREACH production_sum_grpd GENERATEMIN(production_sum.production_hours)-1 ASmin_hour,MAX(production_sum.production_hours)+1 AS max_hour,MIN(production_sum.produced_quantity)-1 AS min_qty,MAX(production_sum.produced_quantity)+1 AS max_qty;

production_norm = FOREACH production_sum

{

  norm_production_hours = (float)(((production_hours -production_min_max.min_hour)/(production_min_max.max_hour -production_min_max.min_hour))*(1-0))+1;

  norm_produced_quantity = (float)(((produced_quantity -production_min_max.min_qty)/(production_min_max.max_qty -production_min_max.min_qty))*(1-0))+1;

  GENERATE manufacturing_unit_id AS manufacturing_unit_id,product_id AS product_id, norm_production_hours ASproduction_hours, norm_produced_quantity AS produced_quantity;

}

prod_norm = FOREACH production_norm GENERATE manufacturing_unit_idAS manufacturing_unit_id,product_id ASproduct_id,production_hours ASproduction_hours,produced_quantity AS produced_quantity;

/*

Calculate the Euclidean distance to find out similar manufacturing units w.r.t the product C001

*/

manufacturing_units_euclidean_distance  = FOREACH (CROSS production_norm,prod_norm) {

distance_between_points = (production_norm::production_hours -prod_norm::production_hours)*(production_norm::production_hours -prod_norm::production_hours) +(production_norm::produced_quantity -prod_norm::produced_quantity)*(production_norm::produced_quantity - prod_norm::produced_quantity);

GENERATE  production_norm::manufacturing_unit_id,production_norm::product_id,prod_norm::manufacturing_unit_id,prod_norm::product_id,SQRT(distance_between_points) as dist;        

};

/*

The results are stored on the HDFS in the directory data_normalization

*/

STORE manufacturing_units_euclidean_distance INTO'/user/cloudera/pdp/output/data_transformation/data_normalization';

Results

The following is a snippet of the results that are generated as a result of the code being executed on the input:

1  C001  1  C001  0.0

1  C001  3  C001  1.413113776343348

1  C001  5  C001  0.2871426024640011

3  C001  1  C001  1.413113776343348

3  C001  3  C001  0.0

3  C001  5  C001  1.1536163027782005

5  C001  1  C001  0.2871426024640011

5  C001  3  C001  1.1536163027782005

5  C001  5  C001  0.0

The similarity between manufacturing units is calculated for one product (C001). As shown previously, manufacturing units 1 and 5 are similar with respect to the product C001, as the distance between them is less when compared to the distance between other units.

Additional information

The complete code and datasets for this section are in the following GitHub directories:

·        Chapter5/code/

·        Chapter5/datasets/

The data integration pattern

The data integration pattern deals with methods to integrate data from multiple sources and techniques to address data inconsistencies that arise out of this activity.

Background

This pattern discusses ways of integrating data from multiple sources. Data integration can sometimes lead to inconsistencies in the data, for example, different data sources may use different units of measurement. The data integration pattern deals with techniques to address data inconsistency.

Motivation

For a multitude of Big Data solutions, it is common for data to exist in various places, such as SQL tables, logfiles, and HDFS. In order to discover exciting relationships between the data that is lying at different places, they have to be ingested and integrated from different sources. On the flipside, this integration of data from multiple sources can sometimes introduce data inconsistencies too. The integration of data enables its enrichment by adding more attributes and giving it more meaning and context. It can also enable the filtering of data by removing unwanted details.

Data integration is achieved predominantly by the join operation. A join operation integrates records from more than one dataset based on a field called foreign key. The foreign key is the field in a table that is equal to the column of another table, and it is used as a means to cross-refer between tables. While this operation is fairly simple in SQL, the way MapReduce works makes it one of the most costly operations to perform on Hadoop.

The following example illustrates a simple way of understanding the different types of joins by taking an example of two datasets: A and B. The following figure represents the values in each dataset.

Motivation

The following are different types of joins that can be performed on the datasets:

·        Inner join: When this is performed on two datasets, all the matching records from both the datasets are returned. As shown in the following figure, it returns the matching records (23) from both the datasets.

Motivation

·        Left outer join: When this is performed on two datasets, all the matching records from both the datasets are returned along with the unmatched records from the dataset on the left-hand side. As shown in the following figure, the matched records (23) along with the unmatched record in the dataset to the left (1) are returned.

Motivation

·        Right outer join: When this is performed on two datasets, all the matching records from both the tables are returned along with the unmatched records from the dataset on the right-hand side. As shown in the following figure, the matched records (23) along with the unmatched record in the dataset to the right (4) are returned.

Motivation

·        Full outer join: When this is applied on two datasets, all the matching records from both the tables are returned along with the unmatched records from both tables. As shown in the following figure, the matched records (23) along with unmatched records in both the datasets (14) are returned.

Motivation

·        Cartesian join: When the Cartesian join is performed on two datasets, each record from the first dataset and all the records of the second dataset are joined together. As shown in the following figure, the result would be (12), (13), (14), (22), (23), (24), (32), (33), and (34).

Motivation

Combining data from multiple sources can result in data inconsistencies. Different data sources may use different units of measurement. For example, assume that there are two data sources, each using a different currency, say dollar versus euro. Due to this, the data integrated from these two sources is inconsistent. Another issue is that the data in each source may be represented differently, for example, true/false versus yes/no. You have to make use of data transformation to resolve these inconsistencies.

The two broad techniques to perform the join operations in Pig are as follows:

·        Reduce-side join: The first technique is called the reduce side join in the MapReduce terminology, and it uses the default join operator on multiple large datasets that have foreign-key relationships. This technique executes any type of join operations (inner, outer, right, left, and so on) on the datasets. Also, it works on many datasets at once. The biggest drawback of this join operation is that it puts tremendous load on the network as all of the data that is being joined is first sorted and then sent to the reducers, thereby making this operation slower to execute.

·        Replicated join: The second technique is called the replicated join and uses the replicated keyword along with the Join operator syntax. This join technique is applicable between one very large dataset and many small datasets. Internally, this join is performed only on the mapper side and does not require the overhead sorting and shuffling of data to the reducer. The replicated join lets Pig distribute the smaller dataset (which is small enough to fit in the memory) to each node so that the dataset is joined directly to the map job, thereby eliminating the need for the reduce job. Not all types of joins are supported in the replicated join; it supports only inner and left outer joins.

Use cases

You can consider using this design pattern in the following scenarios:

·        When you need to combine data from multiple sources before applying analytics on it

·        To reduce the processing time by denormalizing the data; denormalization can be achieved by joining the transactions dataset with its associated master dataset(s)

·        Transform data to resolve data inconsistencies that might have been introduced as a result of data integration

·        Filter data using specific joins

Pattern implementation

This design pattern is implemented in Pig as a standalone script. It combines production information of all the manufacturing units, resolves data inconsistencies by transforming the data, and finds out whether each unit is performing to its optimal best.

The script first loads data of each manufacturing unit and combines them using UNION. It then denormalizes the data by applying joins on the production dataset with its master datasets to get manufacturing unit and product details. It has an implementation of the replicated join to join a huge production dataset with a smaller dataset called products. One of the units uses INR as its currency; this introduces data inconsistency. The script resolves this inconsistency by transforming the manufacturing cost attribute of the unit, which is in INR, to USD.

The script then compares the actual quantity produced with the expected quantity for each unit to find out whether each unit is performing optimally.

Code snippets

To illustrate the working of this pattern, we have considered a manufacturing dataset that is stored on the HDFS. It contains three master files; manufacturing_units.csv contains information about each manufacturing unit, products.csv contains details of the products that are manufactured, and manufacturing_units_products.csv holds detailed information of products that are manufactured in different manufacturing units. The production dataset has a separate production file for each manufacturing unit; this file contains attributes such as production_date, production_hours, manufacturing_unit_id, product_id, and produced_quantity. The following code is the Pig script illustrating the implementation of this pattern:

/*

Load the production datasets of five manufacturing units into the relations

*/

production_unit_1 = LOAD'/user/cloudera/pdp/datasets/data_transformation/production_unit_1.csv' USING PigStorage(',') AS(production_date:datetime,production_hours:int,manufacturing_unit_id:chararray,product_id:chararray,produced_quantity:int);

production_unit_2 = LOAD'/user/cloudera/pdp/datasets/data_transformation/production_unit_2.csv' USING PigStorage(',') AS(production_date:datetime,production_hours:int,manufacturing_unit_id:chararray,product_id:chararray,produced_quantity:int);

production_unit_3 = LOAD'/user/cloudera/pdp/datasets/data_transformation/production_unit_3.csv' USING PigStorage(',') AS(production_date:datetime,production_hours:int,manufacturing_unit_id:chararray,product_id:chararray,produced_quantity:int);

production_unit_4 = LOAD'/user/cloudera/pdp/datasets/data_transformation/production_unit_4.csv' USING PigStorage(',') AS(production_date:datetime,production_hours:int,manufacturing_unit_id:chararray,product_id:chararray,produced_quantity:int);

production_unit_5 = LOAD'/user/cloudera/pdp/datasets/data_transformation/production_unit_5.csv' USING PigStorage(',') AS(production_date:datetime,production_hours:int,manufacturing_unit_id:chararray,product_id:chararray,produced_quantity:int);

/*

Combine the data in the relations using UNION operator

*/

production = UNIONproduction_unit_1,production_unit_2,production_unit_3,production_unit_4,production_unit_5;

/*

Load manufacturing_unit and manufacturing_units_products datasets

*/

manufacturing_units_products = LOAD'/user/cloudera/pdp/datasets/data_transformation/manufacturing_units_products.csv' USING PigStorage(',') AS(manufacturing_unit_id:chararray,product_id:chararray,capacity_per_hour:int,manufacturing_cost:float);

manufacturing_units = LOAD'/user/cloudera/pdp/datasets/data_transformation/manufacturing_units.csv' USING PigStorage(',') AS(manufacturing_unit_id:chararray,manufacturing_unit_name:chararray,manufacturing_unit_city:chararray,country:chararray,currency:chararray);

/*

Use replicated join to join the relation production, which is huge with a smaller relation manufacturing_units_products.

The relations manufacturing_units_products and manufacturing units are small enough to fit into the memory

*/

replicated_join = JOIN production BY(manufacturing_unit_id,product_id),manufacturing_units_products BY(manufacturing_unit_id,product_id) USING 'replicated';

manufacturing_join = JOIN replicated_join BYproduction::manufacturing_unit_id, manufacturing_units BYmanufacturing_unit_id USING 'replicated';

/*

Identify varying representation of currency and transform the values in the attribute manufacturing_cost to USD for the units that have INR as currency

*/

transformed_varying_values = FOREACH manufacturing_join GENERATE$0 AS production_date,$2 AS manufacturing_unit_id,$3 ASproduct_id,$4 AS actual_quantity_produced,($1*$7) AS expected_quantity_produced,(float)((($13 == 'INR') ?($8/60) : $8)*$4) AS manufacturing_cost;

/*

Calculate the expected quantity to be produced, actual quantity produced, percentage, total manufacturing cost for each month for each manufacturing unit and product to identify how each unit is performing

*/

transformed_varying_values_grpd = GROUP transformed_varying_valuesBY (GetMonth($0),manufacturing_unit_id,product_id);

quantity_produced = FOREACH transformed_varying_values_grpd

{

  expected_quantity_produced =SUM(transformed_varying_values.expected_quantity_produced);

  actual_quantity_produced =SUM(transformed_varying_values.actual_quantity_produced);

  percentage_quantity_produced =100*actual_quantity_produced/expected_quantity_produced;

  manufacturing_cost =SUM(transformed_varying_values.manufacturing_cost);

  GENERATE group.$0 AS production_month,group.$1 ASmanufacturing_unit_id,group.$2 ASproduct_id,expected_quantity_produced ASexpected_quantity_produced,actual_quantity_produced ASactual_quantity_produced,percentage_quantity_produced ASpercentage_quantity_produced,ROUND(manufacturing_cost) ASmanufacturing_cost;

}

/*

Sort the relation by the percentage of quantity produced

*/

ordered_quantity_produced = ORDER quantity_produced BY $5 DESC;

/*

The results are stored on the HDFS in the directory data_integration

*/

STORE ordered_quantity_produced INTO '/user/cloudera/pdp/output/data_transformation/data_integration';

Results

The following is a snippet of the results that are generated as a result of the code being executed on the input:

6  2  C003  2400  2237    93  894800

10  2  C004  1984  1814  91  816300

12  3  L002  74400  66744  89  33372

The first column shows the month, the second column is manufacturing unit id, and the third column represents product id. Expected quantity to be produced, actual quantity produced, percentage, and total manufacturing cost per month; all of these are calculated based on which monthly performance of each unit can be analyzed.

Additional information

The complete code and datasets for this section are in the following GitHub directories:

·        Chapter5/code/

·        Chapter5/datasets/

The aggregation pattern

The aggregation design pattern explores the usage of Pig to transform data by applying summarization or aggregation operations on data.

Background

Aggregation provides a summarized high-level view of the data. Aggregation combines more than one attribute into a single attribute, thus reducing the total records by treating a set of records as a single record or by paying no attention to subsections of unimportant records. Data aggregation can be performed at different levels of granularity.

Data aggregation retains the integrity of the data, though the volume of the resulting dataset is smaller than the original datasets.

Motivation

Data aggregation plays a key role in Big Data, as it is inherently difficult for huge volumes of data to provide too much of information as a whole. Instead, the data is collected on a daily basis to be aggregated into weekly data; this weekly data can be aggregated into a value for the month and so on. This enables data patterns to emerge, which can be used for analysis. A simple illustration would be to get more information about particular groups based on specific attributes, such as purchases by age, by segmenting age groups. This ability of aggregating data using specific attributes quickly provides valuable insights to conduct further analytics.

There are various techniques to aggregate data. The basic techniques employed for aggregating data include SUM, AVG, and COUNT; advanced techniques include CUBE and ROLLUP.

CUBE and ROLLUP are similar in many aspects, and they both summarize data to produce a single result set. ROLLUP calculates aggregations such as SUM, COUNT, MAX, MIN, and AVG at varying levels of hierarchy, from the subtotals up to a grand total.

CUBE enables the computation of SUM, COUNT, MAX, MIN, and AVG using all possible combinations of the values in the selected columns. Once this aggregation is computed on a set of columns, it can provide results of all possible aggregation questions on those dimensions.

Use cases

You can consider using this design pattern to produce a summarized representation of the data. We will look at a few scenarios where it is necessary to replace data with either the summary or the aggregated information. Such aggregations are done before the data is sent for analytical processing. The aggregation design pattern can be used in the following specific scenarios:

·        Records containing transactional information can be aggregated based on multiple dimensions such as product or transaction date

·        Individual information such as the income of each member in a family can be summarized to represent the average family income

Pattern implementation

This design pattern is implemented as a standalone Pig script. The script has the implementation to aggregate data using CUBE and ROLLUP operators that were introduced in Pig Version 0.11.0.

Aggregation is the fundamental operation that is performed on the data in the transformation phase in Extract, Transform, and Load (ETL). The fastest way to aggregate data is to use ROLLUP and CUBE. In most cases, ROLLUP and CUBE provide the most meaningful aggregation of the data. This script loads production data of multiple manufacturing units. This data can be aggregated for various purposes. By applying ROLLUP on this data, we can get the following aggregations:

·        The production quantity of each product in each manufacturing unit for each month

·        The production quantity of each product in each manufacturing unit for all months

·        The quantity of total production in each manufacturing unit

·        The quantity of total production in all manufacturing units

By applying CUBE on the same dataset, we get the following aggregations in addition to the previous ones:

·        The production quantity in each manufacturing unit for each month

·        The production quantity of each product for each month

·        The production quantity of each product

·        The production quantity for each month

The additional four aggregations returned by CUBE are the result of its built-in capability to create subtotals for all possible combinations of grouping columns.

Code snippets

To illustrate the working of this pattern, we have considered a manufacturing dataset stored on the HDFS. It contains three master files: manufacturing_units.csv contains information about each manufacturing unit, products.csv contains details of the products that are manufactured, and manufacturing_units_products.csv holds detailed information of the products that are manufactured in different manufacturing units. The file production.csv contains the production information of each manufacturing unit; this file contains attributes such as production_date, production_hours, manufacturing_unit_id, product_id, and produced_quantity. We will be applying CUBE and ROLLUP aggregations on manufacturing_unit_id, product_id, and production_month, as shown in the following code:

/*

Load the data from production.csv, manufacturing_units_products.csv, manufacturing_units.csv files into the relations production, manufacturing_units_products and manufacturing_units

The files manufacturing_units_products.csv and manufacturing_units.csv contain master data information.

*/

production = LOAD'/user/cloudera/pdp/datasets/data_transformation/production.csv' USING PigStorage(',') AS(production_date:datetime,production_hours:int,manufacturing_unit_id:chararray,product_id:chararray,produced_quantity:int);

manufacturing_units_products = LOAD'/user/cloudera/pdp/datasets/data_transformation/manufacturing_units_products.csv' USING PigStorage(',') AS(manufacturing_unit_id:chararray,product_id:chararray,capacity_per_hour:int,manufacturing_cost:float);

manufacturing_units = LOAD'/user/cloudera/pdp/datasets/data_transformation/manufacturing_units.csv' USING PigStorage(',') AS(manufacturing_unit_id:chararray,manufacturing_unit_name:chararray,manufacturing_unit_city:chararray,country:chararray,currency:chararray);

/*

The relations are joined to get details from the master data.

*/

production_join_manufacturing_units_products = JOIN production BY(manufacturing_unit_id,product_id), manufacturing_units_productsBY (manufacturing_unit_id,product_id);

manufacture_join = JOINproduction_join_manufacturing_units_products BYproduction::manufacturing_unit_id, manufacturing_units BYmanufacturing_unit_id;

/*

The manufacturing cost attribute is converted to dollars for the units that have currency as INR.

*/

transformed_varying_values = FOREACH manufacture_join GENERATE $2AS manufacturing_unit_id,$3 AS product_id,GetMonth($0) AS production_month,((($13 == 'INR') ? ($8/60) :$8)*$4) AS manufacturing_cost;

/*

Apply CUBE and ROLLUP aggregations on manufacturing_unit_id, product_id, production_month and store the results in the relations results_cubed and results_rolledup

*/

cubed = CUBE transformed_varying_values BYCUBE(manufacturing_unit_id,product_id,production_month);

rolledup = CUBE transformed_varying_values BYROLLUP(manufacturing_unit_id,product_id,production_month);

result_cubed = FOREACH cubed GENERATE FLATTEN(group),ROUND(SUM(cube.manufacturing_cost)) AS total_manufacturing_cost;

result_rolledup = FOREACH rolledup GENERATE FLATTEN(group),ROUND(SUM(cube.manufacturing_cost)) AS total_manufacturing_cost;

/*

The results are stored on the HDFS in the directories cube and rollup

*/

STORE result_cubed INTO'/user/cloudera/pdp/output/data_transformation/data_aggregation/cube';

STORE result_rolledup INTO'/user/cloudera/pdp/output/data_transformation/data_aggregation/rollup';

Results

After applying ROLLUP on manufacturing_unit_id, product_id, and production_month, the following combination of results are produced:

·        The production quantity of each product in each manufacturing unit for each month is as follows:

·        1  C001  1  536600

5  C002  12  593610

·        The production quantity of each product in each manufacturing unit for all months is as follows:

·        1  C001    7703200

·        2  C003    10704000

5  C002    7139535

·        The total production quantity in each manufacturing unit is as follows:

·        1      15719450

4      15660186

·        The total production quantity in all manufacturing units is as follows:

      69236355

After applying CUBE on manufacturing_unit_id, product_id, and production_month, the following combinations in addition to the combinations produced by ROLLUP are obtained:

·        The production quantity in each manufacturing unit for each month is as follows:

·        1    4  1288250

5    12  1166010

·        The production quantity of each product for each month is as follows:

·          C001  8  1829330

·          L002  12  101748

  L001  10  36171

·        The production quantity of each product is as follows:

·          C002    15155785

·          C004    16830110

  L002    667864

·        The production quantity for each month is as follows:

·            2  5861625

·            10  5793634

    11  5019340

As shown previously, CUBE returns four additional aggregations (production quantity in each manufacturing unit for each month, production quantity of each product for each month, production quantity of each product, and production quantity for each month) when compared to ROLLUP. This is because CUBE has the built-in capability of creating subtotals for all possible combinations of grouping columns.

Additional information

The complete code and datasets for this section are in the following GitHub directories:

·        Chapter5/code/

·        Chapter5/datasets/

The data generalization pattern

The data generalization pattern deals with transforming the data by creating concept hierarchies and replacing the data with these hierarchies.

Background

This design pattern explores the implementation of data generalization through a Pig script. Data generalization is the process of creating top-level summary layers called concept hierarchies that describe the underlying data concept in a general form. It is a form of descriptive approach in which the data is grouped and replaced by higher level categories or concepts by using concept hierarchies. For example, the raw values of the attribute age can be replaced with conceptual labels (such as adult, teenager, or toddler), or they can be replaced by interval labels (0 to 5, 13 to 19, and so on). These labels, in turn, can be recursively organized into higher level concepts, resulting in a concept hierarchy for the attribute.

Motivation

In the context of Big Data, a typical analytics pipeline on huge volumes of data requires the integration of multiple structured and unstructured datasets.

The data generalization process reduces the footprint of data to be analyzed in the Hadoop cluster by using generalized data that is described in a concise and summarized manner. Instead of analyzing the entire corpus of the data, the data generalization process presents general properties of the data in the form of concept hierarchies, which is helpful to get a broader, zoomed out view of an analytics trend quickly and is useful for mining at multiple levels of abstraction.

Applying data generalization may result in the loss of detail, but the resultant generalized data is more meaningful and easier to interpret in some of the analytics use cases.

Organizing data in top-level concept hierarchies enables a consistent representation of data when placed among multiple data analytics pipelines. In addition to this, analytics on a reduced dataset requires fewer input/output operations and lesser network throughput, and it is more efficient than analytics on a larger, ungeneralized dataset.

Owing to these benefits, data generalization is typically applied before analytics as a pre-processing step, rather than applying it during the mining process. There are various techniques for performing data generalization on numerical data, such as binning, histogram analysis, entropy-based discretization, chi-square analysis, cluster analysis, and discretization by intuitive partitioning. Similarly, for categorical data, generalization can be performed based on the number of distinct values of the attributes that define the hierarchy.

Use cases

You can consider using this design pattern to produce a generalized representation of the numeric and categorical structured data in analytics scenarios, where it is necessary to generalize the data for consistency using a higher level summary rather than a low-level raw data.

You can also consider using this pattern right after the data integration process as an analytics accelerator to create a reduced dataset, making it more amenable for efficient analytics.

Pattern implementation

This design pattern is implemented as a standalone Pig script. The script generates concept hierarchies for categorical data based on a number of distinct values per attribute.

The script performs the join operation on manufacturing_unit_products, products, components, and product_components relations. It then generates the concept hierarchy by selecting distinct values from the attributes components and products; the attributes are sorted in the ascending order of their distinct values. This generates a hierarchy based on the sorted order; the first attribute is at the top level of the hierarchy and the last attribute is at the bottom level of the hierarchy.

Code snippets

The master dataset components.csv contains component details, and the products_components.csv file contains component details and the count of components that are required to manufacture a product. This file contains attributes such as product_id, component_id, and required_quantity. The following code is the Pig script illustrating the implementation of this pattern:

/*

Load products_components data set into the relation products_components

*/

products_components = LOAD'/user/cloudera/pdp/datasets/data_transformation/products_components.csv' USING PigStorage(',') AS(product_id:chararray,component_id:chararray,required_qty_per_Unit:int);

/*

Calculate the distinct count for product_id and component_id and store the results in the relations products_unique_count and components_unique_count

*/

products_components_grpd = GROUP products_components ALL;

products_unique_count = FOREACH products_components_grpd

{

  attribute_name = 'Products';

  distinct_prod = DISTINCT products_components.product_id;

  GENERATE attribute_name AS attribute_name, COUNT(distinct_prod)AS attribute_count;

}

components_unique_count = FOREACH products_components_grpd

{

  attribute_name = 'Components';

  distinct_comp = DISTINCT products_components.component_id;

  GENERATE attribute_name AS attribute_name, COUNT(distinct_comp)AS attribute_count;

}

/*

The relations product_unique_count and components_unique_count are combined using the UNION operator.

This relation contains two columns attribute_name and attribute_count, it is then sorted by attribute_count

*/

combined_products_components_count = UNIONproducts_unique_count,components_unique_count;

ordered_count = ORDER combined_products_components_count BYattribute_count ASC;

/*

The results are stored on the HDFS in the directory data_generalization

*/

STORE ordered_count INTO'/user/cloudera/pdp/output/data_transformation/data_generalization';

Results

The following is the result of applying generalization on categorical data:

Products    6

Components  18

The result shows attribute name and its unique count; the attributes are ordered by their count. The result depicts the concept hierarchy; the first attribute Products is in the top level of the hierarchy, and the last attribute Components is in the bottom level of hierarchy.

Additional information

The complete code and datasets for this section are in the following GitHub directories:

·        Chapter5/code/

·        Chapter5/datasets/

Summary

In this chapter, you have studied various Big Data transformation techniques that deal with transforming the structure of the data to a hierarchical representation to take advantage of Hadoop's capability to process semistructured data. We have seen the importance of performing normalization on the data before performing analysis on it. We then discussed using joins to denormalize the data joins. CUBE and ROLLUP perform multiple aggregations on the data; these aggregations provide a snapshot of the data. In data generalization, we discussed various generalization techniques for numerical and categorical data.

In the next chapter, we will focus on data reduction techniques. Data reduction aims to obtain a reduced representation of the data; it ensures data integrity, though the obtained dataset is much smaller in volume. We will discuss data reduction techniques such as dimensionality reduction, sampling techniques, binning, and clustering. After reading this chapter, you will be able to choose the right data reduction pattern.