Operator Preconditions

At its core, Cubert MapReduce Script is really a physical langauge – it give us the freedom and the power to structure the computation as we see fit, but as a consequence, it also becomes our responsibility to ensure that we structure the computation in a way that is meaningful. The primary to ensure that we are using Cubert operators correctly is to satisfy the Operator Preconditions. The precondition comprises of two pieces of information: how is the data partitioned, and how is the data sorted. Each Cubert operator specify what precondition it expects in its input data, and as a Cubert Script user, we have to ensure that those preconditions are actually met when we use the said operator. The Cubert Operators section lists all the operator supported by Cubert, and for each operator it specifies what preconditions are expected.

We will start this section with an example to understand the concepts of partition key and sort key based preconditions.

Example: Preconditions for CUBE operator

In the previous section (Aggregation: Cube and Grouping Sets) we noted that in order to compute Partitioned-Additive aggregates in the CUBE operator, the input data must be sorted and partitioned on the measure column (see the PRECONDITION in CUBE operator documentation). In this section, we will use the CUBE operator with different preconditions to the input data and see what happens.

Precondition Error #1: Data is not partitioned

Lets make the first attempt by reading the data directly from Avro files (the data is not partitioned or sorted), as follows:

// script1.cmr
JOB "CUBE operator on AVRO data"
        REDUCERS 1;
        MAP {
                data = LOAD "$input" USING AVRO; // schema: dim0:INT, dim1:INT, memberId:LONG
        }
        // PRECONDITION ERROR. The data is not partitioned on memberId
        CUBE data BY dim0, dim1 INNER memberId AGGREGATES COUNT_DISTINCT(memberId) AS count;
        STORE data INTO "$output" USING AVRO;
END

The data is not partitioned on memberId when we call the CUBE operator, and indeed the cubert command will point out this error during compilation.

> $CUBERT/bin/cubert script1.cmr
Analyzing job [CUBE operator on AVRO data]...
ERROR: PreconditionException [INVALID_PARTITION_KEYS] Found=null. Expected=[memberId]
At:     [MAP] CUBE data BY dim0, dim1 PARTITIONED ON memberId AGGREGATES COUNT_DISTINCT(memberId) AS count;

Cannot compile cubert script. Exiting.

The above tells us that Cubert cannot compile this script because: (a) the CUBE operator requires that the data is partitioned on memberId, and (b) the data we actually provided was not partitioned at all.

BLOCKGEN is the process to partition the data (see BLOCKGEN), which we should use to partition the data first.

Precondition Error #2: Data incorrectly partitioned

This time we will add a job to partition the job (using BLOCKGEN), but to make things interesting we will partition it on wrong columns.

// script2.cmr
JOB "partition the data (incorrectly)"
        REDUCERS 1;
        MAP {
                data = LOAD "$input" USING AVRO;
        }
        BLOCKGEN data BY ROW 1000 PARTITIONED ON dim0; // <== PROBLEM. We should have been partitioning on memberId
        STORE data INTO "tmp" USING RUBIX;
END

JOB "CUBE operator on RUBIX data"
        REDUCERS 1;
        MAP {
                data = LOAD "tmp" USING RUBIX;
        }
        // PRECONDITION ERROR. The data is not partitioned on memberId (it is partitioned on dim0)
        CUBE data BY dim0, dim1 PARTITIONED ON memberId AGGREGATES COUNT_DISTINCT(memberId) AS count;
        STORE data INTO "$output" USING AVRO;
END

And indeed the precondition error will be reported by cubert:

> $CUBERT_HOME/bin/cubert script2.cmr
Analyzing job [CUBE operator on RUBIX data]...
ERROR: PreconditionException [INVALID_PARTITION_KEYS] Found=[dim0]. Expected=[memberId]
At:     [MAP] CUBE data BY dim0, dim1 PARTITIONED ON memberId AGGREGATES COUNT_DISTINCT(memberId) AS count;

Cannot compile cubert script. Exiting.

Precondition Error #3: Data incorrectly sorted

This time we will partition the data correctly, but sort it on different columns.

// script3.cmr
JOB "partition the data (correct partition, but wrong sort key)"
        REDUCERS 1;
        MAP {
                data = LOAD "$input" USING AVRO;
        }
        BLOCKGEN data BY ROW 1000 PARTITIONED ON memberId SORTED ON dim0; // <== PROBLEM. We should have been sorting on memberId
        STORE data INTO "tmp" USING RUBIX;
END

// the second job same as before

This time we will get a different precondition error:

> $CUBERT/bin/cubert script3.cmr
Analyzing job [CUBE operator on AVRO data]...
ERROR: PreconditionException [INVALID_SORT_KEYS] Found=[dim0]. Expected=[memberId]
At:     [MAP] CUBE data BY dim0, dim1 PARTITIONED ON memberId AGGREGATES COUNT_DISTINCT(memberId) AS count;

Cannot compile cubert script. Exiting.

Inspecting Operator Precondition in the Script

We can look at the partition and sort keys of the data before and after each operator in the Cubert script by setting the -d (–debug) flag in command line. The following is an excerpt from the script3.cmr above (the one with incorrect sort keys).

Analyzing job [partition the data (incorrectly)]...
---------------------------------------------
[REDUCE] BLOCKGEN data BY ROW 1000 PARTITIONED ON memberId SORTED ON dim0;

Precondition for data
        Schema: [INT dim0, INT dim1, INT memberId, STRING date]
        Partition Keys: [memberId]
        Sort Keys:      [memberId]

Post Condition
        Schema: [INT dim0, INT dim1, INT memberId, STRING date]
        Partition Keys: [memberId]
        Sort Keys:      [dim0]
Analyzing job [CUBE operator on AVRO data]...
---------------------------------------------
[MAP] CUBE data BY dim0, dim1 PARTITIONED ON memberId AGGREGATES COUNT_DISTINCT(memberId) AS count:LONG;

Precondition for data
        Schema: [INT dim0, INT dim1, INT memberId, STRING date]
        Partition Keys: [memberId]
        Sort Keys:      [dim0]

Post Condition
        ERROR
ERROR: PreconditionException [INVALID_SORT_KEYS] Found=[dim0]. Expected=[memberId]
At:     [MAP] CUBE data BY dim0, dim1 PARTITIONED ON memberId AGGREGATES COUNT_DISTINCT(memberId) AS count;

Cannot compile cubert script. Exiting.

For each operator encountered in the script, the debug mode print the schema, partition keys and sort keys of the input data as well as the partition and sort keys after the operator.