Tuesday, March 25, 2014

Shark on Cassandra (w/ Cash) : Interrogating cached data from C* using HiveQL

As promised, here is part deux of the Spark/Shark on Cassandra series.

In the previous post, we got up and running with Spark on Cassandra.   Spark gave us a way to report off of data stored in Cassandra.  It was an improvement over MR/Hadoop, but we were still left articulating our operations in Java code.  Shark provides an integration layer between Hive and Spark, which allows us to articulate operations in HiveQL at the shark prompt.  This enables a non-developer community to explore and analyze data in Cassandra.

Setup a Spark Cluster

Before we jump to Shark, let's get a Spark cluster going.  To start a spark cluster, first start the master server with:
$SPARK_HOME> bin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2/bin/../logs/spark-bone-org.apache.spark.deploy.master.Master-1-zen.local.out

To ensure the master started properly, tail the logs:
14/03/25 19:54:42 INFO ActorSystemImpl: RemoteServerStarted@akka://sparkMaster@zen.local:7077
14/03/25 19:54:42 INFO Master: Starting Spark master at spark://zen.local:7077
14/03/25 19:54:42 INFO MasterWebUI: Started Master web UI at

In the log output, you will see the master Spark URL (e.g. spark://zen.local:7077).  You will also see the URL for the web UI.  Cut and paste that URL into a browser and have a look at the UI. You'll notice that no workers are available.  So, let's start one:
$SPARK_HOME> bin/start-slaves.sh
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2/bin/../logs/spark-bone-org.apache.spark.deploy.worker.Worker-1-zen.local.out

Again, tail the logs.  You should see the worker successfully register with the master.  You should also see the worker show up in the web UI.  And now, we are ready to get moving with Shark.

Setup Shark

First, download Shark and Hive.  I used shark-0.8.1-bin-hadoop1.tgz and hive-0.9.0-bin.tgz. Untar each of those.  In the $SHARK_HOME/conf directory, copy the shark-env.sh.template file to shark-env.sh and edit the file.  Ensure the settings are configured properly.  For example:
export SPARK_MEM=4g
export SCALA_HOME="/Users/bone/tools/scala-2.9.3"
export HIVE_HOME="/Users/bone/tools/hive-0.9.0-bin"
export SPARK_HOME="/Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2"
export MASTER="spark://zen.local:7077"
Note that the MASTER variable is set to the master URL from the spark cluster.  Make sure that the HADOOP_HOME variable is *NOT* set.  Shark can operate directly on Spark. (you need not have Hadoop deployed)

As with Spark, we are going to use an integration layer developed by TupleJump.   The integration layer is called Cash:

To get started, clone the cash repo and follow the instructions here.   In summary, build the project and copy target/*.jar and target/dependency/cassandra-*.jar into $HIVE_HOME/lib.

Play with Shark

Fun time.  Start shark with the following:
bone@zen:~/tools/shark-> bin/shark

Note that there are two other versions of this command (bin/shark-withinfo and bin/shark-withdebug).  Both are *incredibly* useful if you run into trouble. 

Once you see the shark prompt, you should be able to refresh the Spark Web UI and see Shark under Running Applications.  To get started, first create a database.  Using the schema from our previous example/post, let's call our database "northpole":
shark> create database northpole;
Time taken: 0.264 seconds

Next, you'll want to create an external table that maps to your cassandra table with:
shark> CREATE EXTERNAL TABLE northpole.children(child_id string, country string, first_name string, last_name string, state string, zip string)
     >    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
     >    WITH SERDEPROPERTIES ("cql.primarykey"="child_id", "comment"="check", "read_repair_chance"="0.1", "cassandra.host"="localhost", "cassandra.port"="9160", "dclocal_read_repair_chance"="0.0", "gc_grace_seconds"="864000", "bloom_filter_fp_chance"="0.1", "cassandra.ks.repfactor"="1", "compaction"="{'class' : 'SizeTieredCompactionStrategy'}", "replicate_on_write"="false", "caching"="all");
Time taken: 0.419 seconds

At this point, you are free to execute some HiveQL queries!  Let's do a simple select:
shark> select * from northpole.children;
977.668: [Full GC 672003K->30415K(4054528K), 0.2639420 secs]
michael.myers USA Michael Myers PA 18964
bart.simpson USA Bart Simpson CA 94111
johny.b.good USA Johny Good CA 94333
owen.oneill IRL Owen O'Neill D EI33
richie.rich USA Richie Rich CA 94333
collin.oneill IRL Collin O'Neill D EI33
dennis.menace USA Dennis Menace CA 94222
Time taken: 13.251 seconds


How cool is that? Now, let's create a cached table!
shark> CREATE TABLE child_cache TBLPROPERTIES ("shark.cache" = "true") AS SELECT * FROM northpole.children;
Moving data to: file:/user/hive/warehouse/child_cache
Time taken: 10.294 seconds

And finally, let's try that select again, this time against the cached table:
shark> select * from child_cache;
owen.oneill IRL Owen O'Neill D EI33
bart.simpson USA Bart Simpson CA 94111
michael.myers USA Michael Myers PA 18964
dennis.menace USA Dennis Menace CA 94222
richie.rich USA Richie Rich CA 94333
johny.b.good USA Johny Good CA 94333
collin.oneill IRL Collin O'Neill D EI33
Time taken: 6.511 seconds


Alrighty, that should get you started.
Again -- kudos to TupleJump for all their work on the Spark/Shark -> C* bridge.

Wednesday, March 19, 2014

Pleasantly Uncomfortable : Innovation run rampant.

Over the last three years, we've built out a kick-ass platform for data management and analytics.

Early on, it was a lot of new technology.  We were integrating and deploying technologies at a rapid rate, almost one per month: dropwizard, spring, extjs then angular, cassandra, solr then elastic search, kafka, zookeeper, storm, titan, github, puppet, etc.  It was a whirlwind.

The new technologies substantially increased productivity and agility.  The platform was working. Product development became capabilities composition.

But recently, it occurred to me that the people we hired along the way and the processes we implemented to support that rapid technical evolution are more powerful than the platform itself. To support that platform approach, we adopted open-source dynamics internally.  Anyone can contribute to any module, just submit a pull request.  Teams are accountable for their products, but they are free to enhance and contribute to any module in the platform.  Those dynamics have allowed us to keep the innovative/collaborative spirit as we grow the company.

And oh my, are we growing...

We now have half-a-dozen product development teams. Each is a cross-discipline (dev, qa, ba, pm) mix of onshore and offshore resources.  The product development teams are enabled by another half-dozen teams that provide infrastructure and support services (ux/ui design, data/information architecture, infrastructure, maintenance, and devops).  The support teams pave the way for the product development teams, so they can rock out features and functionality at warp speed.  For the most part, the product teams use Scrum while the support teams use Kanban so they can react quickly to changing priorities and urgent needs.

Each month we have sprint reviews that mimic a science fair.  Teams have started dressing up and wearing costumes.  It is fun.  Plain and simple.   But at this last sprint review, something happened to me that has never happened in my career.

I've spent my years at two different types of companies:  startups and large enterprises.  At the startups, every innovation (and all of the code) was generated by hands at keyboards within a single room (or two).  You knew everything that was going into the product, every second of the day.  At large enterprises, innovation was stifled by process.  You knew everything that was happening because things happened at a turtle's pace.  At HMS, we've got something special, a balance between those worlds.

At the last sprint review, I was surprised... for the first time in my career.  The teams presented innovations that I didn't know about and never would have come up with on my own.   There were beautiful re-factorings, and enabling technical integrations.  But honestly, I was uncomfortable.   I thought maybe I was losing my edge.  I questioned whether I reached a point where I could no longer keep up with everything.  It was disconcerting.

I spent a couple hours moping.  Then in a moment of clarity, I realized that I was a member of an Innovative Organization: an organization at an optimal balance point between process and productivity, where the reigns of innovation were in everyone's hands -- with a platform and processes that supported them.

Yeah, this sound sounds corny.  But I kid you not, it is amazing.  We've gone from a situation where a few champions were moving boulders up mountains, to a state where entire teams are leaning forward and pulling the organization along.  I'm now happy to enjoy the ride. (but you can bet your ass, I'm going to try to race ahead and meet them at the pass =)

(Kudos to @acollautt, @irieksts,@jmosco, @pabrahamsson, @bflad for giving me this uncomfortable wedgie-like feeling)