UNION PatternΒΆ

Lets say we have two datasets, with different schemas. We would like to take a union of these two datasets and use it for further processing. Clearly, we would have to first transform the two datasets to convert them to same schema. As an example,

// Inputs:
pv = LOAD "pageviews.avro" USING AVRO; // memberId:LONG, page_key:STRING
search = LOAD "searches.avro" USING AVRO; //  memberId:LONG, query:STRING

// Trasformation of pageviews
dimTable = LOAD CACHED "dimTable" USING AVRO; // page_key:STRING, search_type: STRING
pv1 = HASH-JOIN pv BY page_key, dimTable BY page_key;
pv1 = FROM pv1 GENERATE pv___memberId AS memberId, dimTable___search_type AS search_type;

// Transformation of searches
search1 = FROM search GENERATE memberId, CASE (query MATCHES "*people*", "PEOPLE", true, "OTHER") as search_type;

// pv1 and search1 have the same schema now
// we desire union of pv1 and search1

In general, the UNION pattern involves more than one input dataset (each with different schema), where each dataset is transformed separately to convert them into common schema.

The UNION pattern is accomplished in Cubert Script by using multiple MAP { } code block in the same job. The above example would be written as follows:

JOB "example of UNION PATTERN"
        REDUCERS 100;
        MAP {
                pv = LOAD "pageviews.avro" USING AVRO; // memberId:LONG, page_key:STRING
                pv1 = HASH-JOIN pv BY page_key, dimTable BY page_key;
                mbr_search = FROM pv1 GENERATE pv___memberId AS memberId, dimTable___search_type AS search_type;
        }
        MAP {
                search = LOAD "searches.avro" USING AVRO; //  memberId:LONG, query:STRING
                mbr_search = FROM search GENERATE memberId, CASE (query MATCHES "*people*", "PEOPLE", true, "OTHER") as search_type;
        }
        SHUFFLE mbr_search PARTITIONED ON memberId;
        REDUCE {
                // reduce operators
        }
        STORE output INTO "output" USING AVRO;
END

The above is called the multi-mapper idiom. Notice that the last block in each mapper have the same name (mbr_search, in the example). This is a requirement and the cubert compiler will report error if the last block in the mappers do not have identical name.

In the above example, we applied a simple SHUFFLE to the union data. We could also have used other shuffle operators (BLOCKGEN, CUBE-ADDITIVE or CUBE-COUNT-DISTINCT) or store it directly to the filesystem (i.e. a Map-only job).