In a small database much of this would be trivial using aggregating functions (SUM(), COUNT() etc). As the volume grows, one often precalculates these metrics which brings it's own set of consistency challenges. As one outgrows a database, as GBIF are, we need to look for new mechanisms to manage these metrics. The features of DataCube that make this attractive to us are:
- A managable process to modify the cube structure
- A higher level API to develop against
- Ability to rebuild the cube with a single pass over the source data
For this tutorial we will consider the source data as classical DarwinCore occurrence records, where each record represents the metadata associated with a species observation event, e.g.:
ID, Kingdom, ScientificName, Country, IsoCountryCode, BasisOfRecord, CellId, Year
1, Animalia, Puma concolor, Peru, PE, Observation, 13245, 1967
2, Plantae, Abies alba, Spain, ES, Observation, 3637, 2010
3, Plantae, Abies alba, Spain, ES, Observation, 3638, 2010
Suppose the following metrics are required, each of which is termed a rollup in OLAP:
- Number of records per country
- Number of records per kingdom
- Number of records georeferenced / not georeferenced
- Number of records per kingdom per country
- Number of records georeferenced / not georeferenced per country
- Number of records georeferenced / not georeferenced per kingdom
- Number of records georeferenced / not georeferenced per kingdom per country
Given the requirements above, this can be translated into a cube definition with 3 dimensions, and 7 rollups as follows:
/**
* The cube definition (package access only).
* Dimensions are Country, Kingdom and Georeferenced with counts available for:
*
*- Country (e.g. number of record in DK)
*- Kingdom (e.g. number of animal records)
*- Georeferenced (e.g. number of records with coordinates)
*- Country and kingdom (e.g. number of plant records in the US)
*- Country and georeferenced (e.g. number of records with coordinates in the UK
*- Country and kingdom and georeferenced (e.g. number of bacteria records with coordinates in Spain)
*
* TODO: write public utility exposing a simple API enabling validated read/write access to cube.
*/
class Cube {
// no id substitution
static final DimensionCOUNTRY = new Dimension ("dwc:country", new StringToBytesBucketer(), false, 2);
// id substitution applies
static final DimensionKINGDOM = new Dimension ("dwc:kingdom", new StringToBytesBucketer(), true, 7);
// no id substitution
static final DimensionGEOREFERENCED = new Dimension ("gbif:georeferenced", new BooleanBucketer(), false, 1);
// Singleton instance if accessed through the instance() method
static final DataCubeINSTANCE = newInstance();
// Not for instantiation
private Cube() {
}
/**
* Creates the cube.
*/
private static DataCubenewInstance() {
// The dimensions of the cube
List> dimensions = ImmutableList. >of(COUNTRY, KINGDOM, GEOREFERENCED);
// The way the dimensions are "rolled up" for summary counting
Listrollups =
ImmutableList.of(new Rollup(COUNTRY),
new Rollup(KINGDOM),
new Rollup(GEOREFERENCED),
new Rollup(COUNTRY, KINGDOM),
new Rollup(COUNTRY, GEOREFERENCED),
new Rollup(KINGDOM, GEOREFERENCED),
// more than 2 requires special syntax
new Rollup(ImmutableSet.of(new DimensionAndBucketType(COUNTRY), new DimensionAndBucketType(KINGDOM),
new DimensionAndBucketType(GEOREFERENCED))));
return new DataCube(dimensions, rollups);
}
}
In this code, we are making use of ID substitution for the kingdom. ID substitution is an inbuilt feature of DataCube whereby an auto-generated ID is used to substitute verbose coordinates (a value for a dimension). This is an important feature to help improve performance as coordinates are used to construct the cube lookup keys, which translate into the key used for the HBase table. The substitution is achieved by using a simple table holding a running counter and a mapping table holding the field-to-id mapping. When inserting data into the cube, the counter is incremented (with custom locking to support concurrency within the cluster), the mapping is stored, and the counter value used as the coordinate. When reading, the mapping table is used to construct the lookup key.
With the cube defined, we are ready to populate it. One could simply iterate over the source data and populate the cube with the likes of the following:
DataCubeIodataCubeIo = setup(Cube.INSTANCE); // omitted for brevity
dataCubeIo.writeSync(new LongOp(1),
new WriteBuilder(Cube.INSTANCE)
.at(Cube.COUNTRY, "Spain") // for example
.at(Cube.KINGDOM, "Animalia")
.at(Cube.GEOREFERENCED, true)
);
However, one should consider what to do when you have the following inevitable scenarios:
- A new dimension or rollup is to be added to the running cube
- Changes to the source data have occurred without the cube being notified (e.g. through a batch load, or missing notifications due to messaging failures)
- Some disaster recovery requiring a cube rebuild
To handle this when using HBase as the cube storage engine, we make use of the inbuilt backfill functionality. Backfilling is a multistage process:
- A snapshot of the live cube is taken and stored in a snapshot table
- An offline cube is calculated from the source data and stored in a backfill table
- The snapshot and live cube are compared to determine changes that were accepted in the live cube during the rebuilding process (step 2). These changes are then applied to the backfill table
- The backfill is hot swapped to become the live cube
This is all handled within DataCube with the exception of stage 2, where we are required to provide a BackfillCallback, the logic responsible for populating the new cube from the source data. The following example illustrates a BackfillCallback using a simple MapReduce job to scan an HBase table for the source data.
/**
* The callback used from the backfill process to spawn the job to write the new data in the cube.
*/
public class BackfillCallback implements HBaseBackfillCallback {
// Property keys passed in on the job conf to the Mapper
static final String TARGET_TABLE_KEY = "gbif:cubewriter:targetTable";
static final String TARGET_CF_KEY = "gbif:cubewriter:targetCF";
// Controls the scanner caching size for the source data scan (100-5000 is reasonable)
private static final int SCAN_CACHE = 200;
// The source data table
private static final String SOURCE_TABLE = "dc_occurrence";
@Override
public void backfillInto(Configuration conf, byte[] table, byte[] cf, long snapshotFinishMs) throws IOException {
conf = HBaseConfiguration.create();
conf.set(TARGET_TABLE_KEY, Bytes.toString(table));
conf.set(TARGET_CF_KEY, Bytes.toString(cf));
Job job = new Job(conf, "CubeWriterMapper");
job.setJarByClass(CubeWriterMapper.class);
Scan scan = new Scan();
scan.setCaching(SCAN_CACHE);
scan.setCacheBlocks(false);
// we do not want to get bad counts in the cube!
job.getConfiguration().set("mapred.map.tasks.speculative.execution", "false");
job.getConfiguration().set("mapred.reduce.tasks.speculative.execution", "false");
job.setNumReduceTasks(0);
TableMapReduceUtil.initTableMapperJob(SOURCE_TABLE, scan, CubeWriterMapper.class, null, null, job);
job.setOutputFormatClass(NullOutputFormat.class);
try {
boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("Unknown error with job. Check the logs.");
}
} catch (Exception e) {
throw new IOException(e);
}
}
}
/**
* The Mapper used to read the source data and write into the target cube.
* Counters are written to simplify the spotting of issues, so look to the Job counters on completion.
*/
public class CubeWriterMapper extends TableMapper{
// TODO: These should come from a common schema utility in the future
// The source HBase table fields
private static final byte[] CF = Bytes.toBytes("o");
private static final byte[] COUNTRY = Bytes.toBytes("icc");
private static final byte[] KINGDOM = Bytes.toBytes("ik");
private static final byte[] CELL = Bytes.toBytes("icell");
// Names for counters used in the Hadoop Job
private static final String STATS = "Stats";
private static final String STAT_COUNTRY = "Country present";
private static final String STAT_KINGDOM = "Kingdom present";
private static final String STAT_GEOREFENCED = "Georeferenced";
private static final String STAT_SKIPPED = "Skipped record";
private static final String KINGDOMS = "Kingdoms";
// The batch size to use when writing the cube
private static final int CUBE_WRITE_BATCH_SIZE = 1000;
static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private DataCubeIodataCubeIo;
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
// ensure we're all flushed since batch mode
dataCubeIo.flush();
dataCubeIo = null;
}
/**
* Utility to read a named field from the row.
*/
private Integer getValueAsInt(Result row, byte[] cf, byte[] col) {
byte[] v = row.getValue(cf, col);
if (v != null && v.length > 0) {
return Bytes.toInt(v);
}
return null;
}
/**
* Utility to read a named field from the row.
*/
private String getValueAsString(Result row, byte[] cf, byte[] col) {
byte[] v = row.getValue(cf, col);
if (v != null && v.length > 0) {
return Bytes.toString(v);
}
return null;
}
@Override
protected void map(ImmutableBytesWritable key, Result row, Context context) throws IOException, InterruptedException {
String country = getValueAsString(row, CF, COUNTRY);
String kingdom = getValueAsString(row, CF, KINGDOM);
Integer cell = getValueAsInt(row, CF, CELL);
WriteBuilder b = new WriteBuilder(Cube.INSTANCE);
if (country != null) {
b.at(Cube.COUNTRY, country);
context.getCounter(STATS, STAT_COUNTRY).increment(1);
}
if (kingdom != null) {
b.at(Cube.KINGDOM, kingdom);
context.getCounter(STATS, STAT_KINGDOM).increment(1);
context.getCounter(KINGDOMS, kingdom).increment(1);
}
if (cell != null) {
b.at(Cube.GEOREFERENCED, true);
context.getCounter(STATS, STAT_GEOREFENCED).increment(1);
}
if (b.getBuckets() != null && !b.getBuckets().isEmpty()) {
dataCubeIo.writeSync(new LongOp(1), b);
} else {
context.getCounter(STATS, STAT_SKIPPED).increment(1);
}
}
// Sets up the DataCubeIO with IdService etc.
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
HTablePool pool = new HTablePool(conf, Integer.MAX_VALUE);
IdService idService = new HBaseIdService(conf, Backfill.LOOKUP_TABLE, Backfill.COUNTER_TABLE, Backfill.CF, EMPTY_BYTE_ARRAY);
byte[] table = Bytes.toBytes(conf.get(BackfillCallback.TARGET_TABLE_KEY));
byte[] cf = Bytes.toBytes(conf.get(BackfillCallback.TARGET_CF_KEY));
DbHarnesshbaseDbHarness =
new HBaseDbHarness(pool, EMPTY_BYTE_ARRAY, table, cf, LongOp.DESERIALIZER, idService, CommitType.INCREMENT);
dataCubeIo = new DataCubeIo(Cube.INSTANCE, hbaseDbHarness, CUBE_WRITE_BATCH_SIZE, Long.MAX_VALUE, SyncLevel.BATCH_SYNC);
}
}
With the callback written, all that is left to populate the cube is to run the backfill. Note that this process can also be used to bootstrap the live cube for the first time:
// The live cube table
final byte[] CUBE_TABLE = "dc_cube".getBytes();
// Snapshot of the live table used during backfill
final byte[] SNAPSHOT_TABLE = "dc_snapshot".getBytes();
// Backfill table built from the source
final byte[] BACKFILL_TABLE = "dc_backfill".getBytes();
// Utility table to provide a running count for the identifier service
final byte[] COUNTER_TABLE = "dc_counter".getBytes();
// Utility table to provide a mapping from source values to assigned identifiers
final byte[] LOOKUP_TABLE = "dc_lookup".getBytes();
// All DataCube tables use a single column family
final byte[] CF = "c".getBytes();
HBaseBackfill backfill =
new HBaseBackfill(
conf,
new BackfillCallback(), // our implementation
CUBE_TABLE,
SNAPSHOT_TABLE,
BACKFILL_TABLE,
CF,
LongOp.LongOpDeserializer.class);
backfill.runWithCheckedExceptions();
While HBase provides the storage for the cube, a backfill could be implemented against any source data, such as from a database over JDBC or from text files stored on a Hadoop filesystem.
Finally we want to be able to read our cube:
DataCubeIodataCubeIo = setup(Cube.INSTANCE); // omitted for brevity
Optionalresult =
cubeIo.get(
new ReadBuilder(cube)
.at(Cube.COUNTRY, "DK")
.at(Cube.KINGDOM, "Animalia"));
// need to check if this coordinate combination hit anything in the cube
if (result.isPresent()) {
LOG.info("Animal records in Denmark: " + result.get().getLong());
)
All the source code for the above is available in the GBIF labs svn.
Many thanks to Dave Revell at UrbanAirship for his guidance.
0 comments:
Post a Comment