I am a programmer and architect (the kind that writes code) with a focus on testing and open source; I maintain the PHPUnit_Selenium project. I believe programming is one of the hardest and most beautiful jobs in the world. Giorgio is a DZone MVB and is not an employee of DZone and has posted 636 posts at DZone. You can read more from them at their website. View Full User Profile

How to shard a cron

09.04.2013
| 3874 views |
  • submit to reddit

Sharding is a database partitioning technique that distributed Aggregates such as rows or documents across multiple servers; this choice for horizontal queries trades in some client complexity (whose queries must include a shard key such as a zip code or a customer id) for the capability of distributing the dataset between multiple servers, scaling not only the read but also the write capacity.

On the application side, there are several singleton processes - for example cron configurations - that are usually run only once on the whole data set. For a certain category of singleton processes we can switch to a shard-like architecture that can scale first to multiple processes and when necessary to multiple servers.

Step 1: identify the candidate process

Take a look at your crontab or at your process scheduling configuration if you use another infrastructure. Some of the processes are aggregations of data producing statistics, and their work can already be distributed with patterns such as MapReduce.

The kind of processes interesting for client sharding is the one where an operation is performed over every single element of the data set. Each element is an Aggregate and as such does not interact, in a single transaction, with other ones. Therefore these processes are intrinsically parallelizable: you only need a way to distribute the load.

Some examples of shard-able processes are:

  • rebuild the data aggregates for a new day for each user
  • perform some consistency checks on every customer order
  • send all pending orders
  • perform a renewal for each user subscription

Step 2: choose a shard key

Once you have identified the aggregates along which to parallelize a length operation, the choice of the shard key will usually be straightforward. This key must be uniformly distributed between the N shards you want to create on the client side. Some examples:

  • the zip code for customers
  • the numerical, sequential id for orders
  • a UUID for uploaded videos
  • the transaction reference number for money transactions

Step 3: divide the work with the shard key

A process before the application of sharding is usually composed of two phases:

  • select all Aggregates whose satisfy condition C
  • apply operation O to all the selected Aggregates.

The first operation can be sometime sharded directly, transforming each of the N processes in:

  • select all Aggregates whose satisfy condition C and whose shard key is equal to this shard's number modulo N.
  • apply operation O to all the selected Aggregates.

For example, the first operation for shard 0 of 4 can be accomplished by an SQL query:

SELECT * FROM aggregate_table
WHERE outdated=true // condition C
AND aggregate_id % 4=0// sharding

Precalculating aggregate_id % 4 can improve the performance of the query, depending on your database; however it can make more difficult to rescale the number of processes. When you switch to 8 or 16 client shards it will be necessary to stop all current running processes, recalculate the column and restart the new batch. Furthermore, the performance of ALTER TABLE is usually not good on large tables which are the subject of this client sharding technique.

Some times we're not able to divide the query in a partition of the original data set directly in the database. The pattern becomes:

  1. select id (and shard key if different) for all Aggregates whose satisfy condition C.
  2. filter the subset by only considering the id whose modulo N is equal to this shard's number.
  3. apply operation O to all the selected Aggregates.

For example, I use this second form while using multiple client with MongoDB. I do not know if it's possible to query ObjectIds by their modulo, so I resort to selecting all of them and then filtering them out on the client side:

$id = (string) $document['_id'];
$numericalValue = hexdec(substr($id, -4)); // without substr() the conversion will overflow 32-bit integers
if ($numericalValues % $shards == $shard) {
  ...
}

This is only useful under the assumption that is not the query that's taking too much time in the original process, but the application of the O operation to all of its results.

Note also that these solutions needs well-behaving processes that do not intervene on the data of each other: the filtering is left to the programmer. In general, it is also necessary to guarantee mutual exclusion with the original singleton process; this usually comes up when deploying the battery of N crons, as they should not start until the last of the singleton processes has terminated not to be started again.

Published at DZone with permission of Giorgio Sironi, author and DZone MVB.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)