Friday, February 28, 2014

Big Data : On the Precipice of a Collapse

Before anyone freaks out, I'm talking about a technology collapse, not a market collapse or the steep downhill slope of a hype curve.  But I guess a "technology collapse" doesn't sound much better.  Maybe "technology consolidation" is a better word, but that's not really what I mean.  Oh well, let me explain...

For anyone that has been through introductory computer science courses, you know that different data structures are suited for different applications.   With a bit of analysis you can classify the cost of an operation against a specific structure, and how that will scale as the size of the data or the input scales. (See Big O Notation if you would like to geek out.)  In Big Data, things are no different, but we are usually talking about performance and scaling relative to additional constraints such as the CAP Theorem.

So what are our "data structures and algorithms" in the Big Data space?  Well, here is my laundry/reading list:
So what does that have to do with an imminent collapse?  Well, market demands are pushing our systems to ingest an increasing amount of data in a decreasing amount of time, while also making that data immediately available to an increasing variety of queries.

We know from our classes that to accommodate the increased variety of queries, we need different data structures and algorithms to service those queries quickly.  That leaves us with no choice but to duplicate data across different data structures and to use different tools to query across those systems.   Often this approached is referred to as "Polyglot Persistence".  That worked, but it left the application to orchestrate the writes and queries across the different persistence mechanisms.  (painful)

To alleviate that pain, people are collapsing the persistence mechanisms and interfaces. Already, we see first-level integrations.  People are combining inverted-indexes and search/query mechanisms with distributed databases. e.g.
  • Titan integrated Elastic Search.
  • Datastax integrated SOLR.
  • Stargate combines Cassandra and Lucene
This is a natural pairing because the queries you can perform efficiently against distributed databases are constrained by the partitioning and physical layout of the data.  The inverted indexes fill the gap, allowing users to perform queries along dimensions that may not align with the data model used in the distributed database.  The distributed database can handle the write-intensive traffic, while the inverted indexes handle the read-intensive traffic, likely with some lag in synchronization between the two. (but not necessarily)

The tight-integration between persistence mechanisms makes it transparent to the end-user that data was duplicated across stores, but IMHO, we still have a ways to go.  What happens if you want to perform an ad-hoc ETL against the data?  Well, then you need to fire up Hive or Pig (Spark and/or Shark), and use a whole different set of infrastructure, and a whole other language to accomplish your task.

One can imagine a second or third-level integration here, which unifies the interface into "big data": a single language that would provide search/query capabilities (e.g. lucene-like queries), with structured aggregations (e.g. SQL-like queries), with transformation, load and extract functions (e.g. Pig/Hive-like queries) rolled into one-cohesive platform that was capable of orchestrating the operations/functions/queries across the various persistence and execution frameworks.

I'm not sure quite what that looks like.  Would it use Storm or Spark as the compute framework? perhaps running on YARN, backed by Cassandra, Elastic Search and Titan, with pre-aggregation capabilities like those found in Druid?

Who knows?   Food for thought on a Friday afternoon.
Time to grab a beer.

(P.S. shout out Philip Klein and John Savage, two professors back at Brown that inspired these musings)

Thursday, February 13, 2014

Storm and Cassandra : A Three Year Retrospective

We are doing our final edits on our Storm book due out in April.  In reviewing the chapters, I got to thinking through the evolution of our architecture with Storm.  I thought I would capture some of our journey.  Maybe people out there can skip a few epochs of evolution. =)

Kudos again to +P. Taylor Goetz for introducing Storm at Health Market Science.  When Taylor joined the team, we were well on our way to selecting Cassandra as our persistence mechanism, but we had yet to solve the distributed processing problem.  We had varying levels of JMS/JVM sprawl and we were dealing with all the challenges of transactional processing against those queues. (exactly the situation Nathan Marz typically depicts when motivating Storm)

To accompany the JMS/JVM sprawl, we also had Hadoop deployments against Cassandra that we were fairly frustrated with.  The Map/Reduce paradigm for analysis seemed very restrictive, and we were spending a lot of time tweaking jobs to balance work across phases (map, shuffle, reduce).   It felt like we were shoe-horning our problems into M/R.  If you then add on the overhead of spinning up a job and waiting for the results, we wanted better.  Enter Storm.

We had made the decision to go forward with Cassandra, but we didn't see any bridge between Storm and Cassandra -- so we built one.  By December 2011, we had made enough progress on storm-cassandra that it made it into Nathan's talk at the Cassandra Summit, and we started building out our first topologies.

Back in those days, there was no such thing as Trident in Storm.   And given the pain that we first encountered, I'd guess that most of the production uses of Storm did not demand transactional integrity.  I'm guessing that many of those uses only needed "good enough" real-time answers, and likely leveraged Hadoop, lagging somewhat, to correct issues offline.

We didn't like that approach.  Our systems are used to make health-care related decisions in many large operational systems.   We wanted the data to be immediately vendable, and guaranteed. Enter Transactional Topologies.

We started using transactional topologies, getting our feet wet connecting Storm to our JMS queues, and birthing storm-jms in the process.  In this phase, we felt some of the pain of being on the bleeding edge.  APIs felt like they were shifting quite a bit with the coup de grĂ¢ce coming when Trident was introduced.

Trident added to Storm what we needed: better support for transactional processing and state management.  But what was that?  transactional topologies are now deprecated?  Ouch.   Quick -- everyone over to Trident!  (we could have used a heads up here!)

We rapidly ported all of our transactional topologies over to Trident and got acquainted with the new concepts of State.   At the same time, we were advancing our understanding of Cassandra data modeling.

We learned the following, which should be tattooed on everyone working in distributed computing:

  1. Eliminate read-before-write scenarios (never fetch+update, just insert)
  2. Ensure all operations are idempotent (when you insert, overwrite)
  3. When possible, avoid shuffling data around (partitionPersist is your friend)

Make sure your processing flow and data model support the above tenants.  With our tattoos, we continued to build out our use of Storm throughout 2013.

Walking Upright
Many people tend to compare Storm and Hadoop, with Storm portrayed as a "real-time Hadoop". I believe this short changes Storm a bit.   Hadoop (v1) runs a specific algorithm across a pre-defined set of data.  Many times to accomplish something useful, you need to string many M/R jobs together.  Eventually, you find yourself in want of a higher-level language like Pig, Hive or Cascading.  Storm operates at that this higher level, and although it is often cast as a framework for "real-time analytics", it is a general data processing layer capable of accommodating fairly sophisticated data flows.

In fact, Storm excels as data flow and orchestration infrastructure.  We use it as our data ingestion infrastructure, orchestrating writes across multiple data storage mechanisms.  (see trident-elasticsearch)  It provides the backbone for a solid platform that avails of polyglot persistence.

Future Evolution
The best is yet to come.  Cassandra is churning out new features that make it even more useful for distributed processing.  See my previous post on CQL enhancements.  We created a new project to take advantage of those features. (see storm-cassandra-cql)  It's already getting some traction. (shout out to Robert Lee for contributing the BackingMap implementation, which should be merged shortly)

Also, with Storm's incorporation into Apache and HortonWorks commitment, we should see a more stable API, more frequent releases, and better synergy with other apache projects.  (yarn anyone?!)

So, if you are a gambling type, I'd push your chips to the center of the table.  Bet on Storm and Cassandra being a powerful pair as demands for bigger, better and faster continue to push those of us at the edge of the envelope.  Its anyone's guess what the powerful pair will evolve into, but one can imagine great things.

Thursday, February 6, 2014

Determining if a conditional update was applied w/ CQL Java-Driver

Sorry, I should have included this in my previous post on conditional updates.   One of the critical aspects to using conditional updates is determining whether the update was applied. Here is how you do it.

Given our previous update (all fluentized):
Update updateStatement = update(KEYSPACE_NAME, TABLE_NAME);
updateStatement.with(set(VALUE_NAME, 15)).where(eq(KEY_NAME, "DE")).onlyIf(eq(VALUE_NAME, 10));
this.executeAndAssert(updateStatement, "DE", 15);
When we execute the updateStatement with our session and examine the ResultSet returned:
LOG.debug("EXECUTING [{}]", statement.toString());
ResultSet results = clientFactory.getSession().execute(statement);
for (ColumnDefinitions.Definition definition : results.getColumnDefinitions().asList()) {
   for (Row row : results.all()) {
      LOG.debug("[{}]=[{}]", definition.getName(), row.getBool(definition.getName()));
You'll notice that you get a ResultSet back that contains one row, with a column named "[applied]". The above code results in:
DEBUG EXECUTING [UPDATE mykeyspace.incrementaltable SET v=15 WHERE k='DE' IF v=10;]
DEBUG [[applied]]=[true]
Thus, to check to see if a conditional update is applied or not, you can use the concise code:
Row row =;
if (row != null)
   LOG.debug("APPLIED?[{}]", row.getBool("[applied]"));

Wednesday, February 5, 2014

Conditional updates with CQL3 java-driver

Within storm-cassandra-cql, I'm building out a state implementation that is capable of making incremental state changes.   (Read-Modify-Write)  It leverages Cassandra 2.0's lightweight transactions, and the ability to perform a conditional update.  (more on the rationale for incremental state changes later)

As I was implementing it, I couldn't find any good examples that showed how to implement conditional updates using the QueryBuilder in the CQL3 java driver.

So, here you go:

        // Now let's conditionally update where it is true
        Update updateStatement = QueryBuilder.update(SalesAnalyticsMapper.KEYSPACE_NAME,
        updateStatement.with(QueryBuilder.set(VALUE_NAME, 15));
        Clause clause = QueryBuilder.eq(KEY_NAME, "DE");
        Clause conditionalClause = QueryBuilder.eq(VALUE_NAME, 10);
        this.executeAndAssert(updateStatement, "DE", 15);

The key clause is the conditionalClause which is appended to the statement by calling the onlyIf method on the Update object. The above code will set the value of v=15, where k='DE', but only if the current value of v is still 10. In the storm-cassandra-cql use case, this will cause the state to increment only if the value that was read hasn't changed underneath of it.

I've encapsulated a couple conditional updates in a unit test within storm-cassandra-cql. Have a look to see a more complete set of calls (with comments and assertions).

Tuesday, February 4, 2014

Work started on Storm-Cassandra-CQL!

As I laid out in my previous post, there are a number of motivations to start using CQL.  CQL has better support for batching, conditional updates, and collections. (IMHO)  For those reasons, I've started porting our Trident State implementation to CQL.

The implementation has the same Mapper concept. Simply implement the Mapper interface: map Storm tuples to CQL3 statements.

For example:
   public Statement map(TridentTuple tuple) {
        Update statement = QueryBuilder.update("mykeyspace", "mytable");
        String field = "col1";
        String value = tuple.getString(0);
        Assignment assignment = QueryBuilder.set(field, value);
        long t = System.currentTimeMillis() % 10;
        Clause clause = QueryBuilder.eq("t", t);
        return statement;

(From the ExampleMapper)

The CQL3 statements are then collected and submitted as a batch inside the State implementation.

Below is an example topology:
    public static StormTopology buildTopology() {"Building topology.");
        TridentTopology topology = new TridentTopology();
        ExampleSpout spout = new ExampleSpout();
        Stream inputStream = topology.newStream("test", spout);
        ExampleMapper mapper = new ExampleMapper();
        inputStream.partitionPersist(new CassandraCqlStateFactory(), 
                                     new Fields("test"), 
                                     new CassandraCqlStateUpdater(mapper));

Presently the implementation is *very* simple.  We know we'll need to enhance the batching mechanism.  (e.g. What happens when the size of a batch in Storm exceeds the batch size limit in CQL3?  Bad things. =)

At first glance though, this approach for Storm / Cassandra integration is much simpler than our previous implementation and will allow users to leverage the power and features available in CQL3.  (e.g. We have grand plans to expose / utilize conditional updates to realize incremental state updates from Storm -- more on that later)

I'd encourage people to give it a spin... and submit contributions!