Cubert Map-Reduce Language Reference

Structure of a Cubert Program

A program written in Cubert Script has a header section (optional) followed by the job definition section:

<< header section (optional) >>

<< job definition section >>

Note

Cubert Script uses Java-style comments (// and /* .. */).

Structure of the Header Section

The header section consists of three kinds of commands:

  • REGISTER command: to add custom jars to distributed cache and add them to classpath of mappers and reducers.
  • SET command: to assign hadoop specific configuration parameters.
  • FUNCTION command: to provide constructor arguments to UDFs and/or provide alias names to UDFs.
REGISTER "myUDFJar.jar";
REGISTER "/usr/lib/somejar.jar";

// note that the 'values' are written as Strings
SET mapred.child.java.opts  "-Xmx1G -Djava.net.preferIPv4Stack=true -Duser.timezone=America/Los_Angeles";
SET mapred.map.tasks.speculative.execution "true";
SET mapred.reduce.tasks.speculative.execution "true";
SET mapred.job.queue.name "marathon";

// defining the constructor arguments for the UDF (but not providing an alias name)
FUNCTION com.linkedin.dwh.udf.filter.IsTestMemberId("input/etl/testMemberId.txt");

// defining constructor arguments as well as an alias name
FUNCTION isCrawler com.linkedin.dwh.udf.lookup.WATBotCrawlerLookup("input/etl/bot.properties");

// defining an alias name for a UDF without constructor arguments
FUNCTION epoch com.linkedin.dwh.udf.date.epochToFormat();

REGISTER command first looks the jar in the local filesystem. If found, it copies the jar to a temporary folder in HDFS and adds it to classpath. Otherwise, it assumes that the jar is present in the HDFS at the specified location.

SET command accepts two parameters: the property name, and the value (as a String). This command assigns these properties to the Hadoop Job configuration.

FUNCTION can be used to define an alias for the UDF (which can be used in the FROM..GENERATE operator). Additionally, it can be used to provide constructor arguments to the UDF. The UDF is fully compatible with Pig API for defining function (via the EvalFunc super-class).

Structure of a Job

There are two kinds of jobs: Map-Reduce jobs and Refresh-Dictionary jobs. In this section we will look at the Map-Reduce jobs only; the Refresh-Dictionary job is discussed later in dictionary-job.

The general pattern of a Map-Reduce job is as follows:

JOB "name of this job"
        REDUCERS <int: number of reducers>;
        MAP {
                << load command >>
                << operators >>
        }
        << shuffle command >>
        REDUCE {
                << operators >>
        }
        << store command >>
END

It is also possible to write a Map-only job (without any reducer) as follows:

JOB "name of this job"
        // we do not need to specify the number of reducers
        MAP {
                << load command >>
                << operators >>
        }
        << store command >>
END
  • Each job must be provided with a name.
  • REDUCERS: Each job must specify the number of reducers. For map-only jobs, this can be omitted, or explicitly set to 0.
  • LOAD: Each MAP code block must start with a load command. See Input/Output Operators on specifications of this command.
  • STORE: Each job must end with a store command. See REF on the specifications of this command.
  • SHUFFLE: In a Map-Reduce job, there must be a shuffle command (see Shuffle Operators) as well as REDUCE code block.

It is possible to specify multiple mappers in a single job. See multi-mapper for details.

Word Count in Cubert

Let us now walk through the word count script we discussed in previous section (reproduced below).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
JOB "word count job"
    REDUCERS 10;
    MAP {
        // load the input data set as a TEXT file
        input = LOAD "$CUBERT_HOME/examples/words.txt" USING TEXT("schema": "STRING word");
        // add a column to each tuple
        with_count = FROM input GENERATE word, 1 AS count;
    }
    // shuffle the data and also invoke combiner to aggregate on map-side
    SHUFFLE with_count PARTITIONED ON word AGGREGATES COUNT(count) AS count;
    REDUCE {
        // at the reducers, sum the counts for each word
        output = GROUP with_count BY word AGGREGATES SUM(count) AS count;
    }
    // store the output using TEXT format
    STORE output INTO "output" USING TEXT();
END

Map Phase

  • The Map phase starts with the load command. In this case, we are loading $CUBERT_HOME/examples/words.txt as a text file. We also specify the schema of the input.
  • The LOAD command supports loading daily/hourly data by providing time ranges, filenames with #LATEST or *, or multiple files.
  • In addition to TEXT format, cubert can also load AVRO and RUBIX file formats. For these two formats, it is not required to supply the schema (cubert will automatically fetch schema from the data files).
  • After the load command, there is one operator that adds a column to the rows.

Shuffle Phase

  • In this example, we use the basic SHUFFLE operator. Cubert supports other shuffle operators as well: BLOCKGEN, CUBE-COUNT-DISTINCT and CUBE-ADDITIVE.
  • The shuffle operator specifies that the data (from the mappers) must be partitioned on the word column.
  • The shuffle operator also specifies a Combiner (via the AGGREGATES clause) to sum the count values.

Reduce Phase

  • The input to the operators in this phase is the relation that was shuffled (with_count).

Store Command

  • The output is stored in text format in the specified folder.
  • Cubert can store data in AVRO and RUBIX format as well.

Note

About Variable Scope

The variables names within MAP and REDUCE code blocks are not visible outside.

There are, however, two exceptions: (1) only the last variable on the LHS in the MAP phase is visible to the SHUFFLE command. This same variable name is also visible to the REDUCE commands, (2) only the last variable on the LHS in the REDUCE phase is visible to the STORE command.

While the above Cubert Script code above is already very concise representation of the Word Count problem; as a matter of interest, the idiomatic way of writing in Cubert is even more concise (and a lot faster)!

JOB "idiomatic word count program (even more concise!)"
        REDUCERS 10;
        MAP {
                input = LOAD "words.txt" USING TEXT("schema": "STRING word");
        }
        CUBE input BY word AGGREGATES COUNT(word) AS count GROUPING SETS (word);
        STORE input INTO "output" USING TEXT();
END

How this actually works? Lets study the cubert core concepts first!