Tuesday, February 4, 2014

Work started on Storm-Cassandra-CQL!


As I laid out in my previous post, there are a number of motivations to start using CQL.  CQL has better support for batching, conditional updates, and collections. (IMHO)  For those reasons, I've started porting our Trident State implementation to CQL.

The implementation has the same Mapper concept. Simply implement the Mapper interface: map Storm tuples to CQL3 statements.

For example:
   public Statement map(TridentTuple tuple) {
        Update statement = QueryBuilder.update("mykeyspace", "mytable");
        String field = "col1";
        String value = tuple.getString(0);
        Assignment assignment = QueryBuilder.set(field, value);
        statement.with(assignment);
        long t = System.currentTimeMillis() % 10;
        Clause clause = QueryBuilder.eq("t", t);
        statement.where(clause);
        return statement;
    }

(From the ExampleMapper)

The CQL3 statements are then collected and submitted as a batch inside the State implementation.

Below is an example topology:
    public static StormTopology buildTopology() {
        LOG.info("Building topology.");
        TridentTopology topology = new TridentTopology();
        ExampleSpout spout = new ExampleSpout();
        Stream inputStream = topology.newStream("test", spout);
        ExampleMapper mapper = new ExampleMapper();
        inputStream.partitionPersist(new CassandraCqlStateFactory(), 
                                     new Fields("test"), 
                                     new CassandraCqlStateUpdater(mapper));
        return topology.build();

Presently the implementation is *very* simple.  We know we'll need to enhance the batching mechanism.  (e.g. What happens when the size of a batch in Storm exceeds the batch size limit in CQL3?  Bad things. =)

At first glance though, this approach for Storm / Cassandra integration is much simpler than our previous implementation and will allow users to leverage the power and features available in CQL3.  (e.g. We have grand plans to expose / utilize conditional updates to realize incremental state updates from Storm -- more on that later)

I'd encourage people to give it a spin... and submit contributions!
https://github.com/hmsonline/storm-cassandra-cql


No comments: