Shuffle Operators

SHUFFLE

The SHUFFLE operator appears between the map and reduce phases and requires the partition keys and the sort keys to shuffle the input data. In case the sort keys are not specified, it defaults to the partition keys. Optionally, the SHUFFLE operator can take aggregation functions to be applied as a part of the combiner and the corresponding output column names and types.

    // SHUFFLE the data on the given partition and sort keys
SHUFFLE mapoutput PARTITIONED ON memberId SORTED ON memberId;

    // SHUFFLE the data after applying the aggregations in the combiner
SHUFFLE datablock PARTITIONED ON dim0, dim1 SORTED ON dim0, dim1 AGGREGATES SUM(sum) AS sum:Long;

PRECONDITIONS: None.

BLOCKGEN

Please check out the Partitioned Blocks and Co-partitioned Blocks sections first for an understanding of the Cubert Block concepts.

BLOCKGEN shuffle operator creates blocks from the output of the mappers. The blocks are created based on either:

  • the number of rows
  • the size of the block
  • the index of another BLOCKGEN dataset

In addition, the BLOCKGEN operator takes the partition key and optionally the sort keys for creating the blocks. The sort keys default to partition keys if not specified. Optionally, this operator can do a DISTINCT on the data after shuffle, but before creating blocks.

    // creates blocks each with at most 10000 rows
BLOCKGEN a BY ROW 10000 PARTITIONED ON member_sk, country_sk;

    // creates blocks each with at most 10MB in size
BLOCKGEN a BY SIZE 10000000 PARTITIONED ON member_sk;

    // creates blocks that have the same index as that of the input index
BLOCKGEN a BY INDEX "output/memberBlockgen" PARTITIONED ON memberId SORTED ON memberId, search_type;

// creates block after doing a distinct on the input data of the reducer, but before creating the blocks
    BLOCKGEN DISTINCT data BY ROW 25000 PARTITIONED ON ContextId SORTED ON ContextId;

PRECONDITIONS: None.

CUBE

CUBE is also a shuffle operator that performs the cube computation on additive and partitioned-additive. The advantage of this operator over the CUBE operator is that it abstracts the multiple operators needed for computation. Specifically, this single operator is equivalent to CUBE on the mapper, followed by a shuffle and combiner, and a group by and aggregate on the reducer. The following examples demonstrate the simplicity of this operator.

MAP {
a = LOAD "$inputFactTable" USING AVRO;
}
CUBE a BY dim0, dim1 AGGREGATES COUNT(memberId) AS sum:long GROUPING SETS (dim0), (dim1);
// compute full cube as: CUBE a BY dim0, dim1 AGGREGATES COUNT(memberId) AS sum:long;
STORE a INTO "$output" USING AVRO;

If needed, other operators can be run on the output of CUBE on the reducer:

MAP {
a = LOAD "$inputFactTable" USING AVRO;
}
CUBE a BY dim0, dim1 AGGREGATES COUNT(memberId) AS sum:long GROUPING SETS (dim0), (dim1);
REDUCER {
        output = FILTER a BY dim0 == 1;
}
STORE output INTO "$output" USING AVRO;

This operator can also compute count distinct by specifying the measure as the partitioning key.

MAP {
   cube = LOAD "$inputFactTable" USING RUBIX;
}
// computes full OLAP cube
CUBE cube BY PageKey, ProductPageKeyGroup, Locale, Platform PARTITIONED ON ContextId
                                    COUNT_DISTINCT(ContextId) AS contextCount:LONG;
STORE output INTO "output" USING AVRO;

PRECONDITIONS: None.