Monday, October 5, 2015

Getting started w/ Python Kinesis Consumer Library (KCL) (via IPython Notebook)

We are racing ahead with Kinesis.  With streaming events in hand, we need to get our data scientists tapped in. One problem, the KCL and KPL are heavily focused on Java, but our data scientists (and the rest of our organization) love Python. =)

We use IPython notebook to share python code/samples.  So, in this blog I'm going to get you up and running, consuming events from Kinesis, using Python and IPython notebook.  If you are already familiar with IPython notebook, you can pull the notebook from here:

If you aren't familiar with iPython Notebook, first go here to get a quick overview.  In short, IPython Notebook is a great little webapp that lets you play with python commands via the browser, creating a wiki/gist-like "recipe" that can be shared with others.  Once familiar, read on.

To get started with KCL-python, let's clone the amazon-kinesis-client-python github repo:

   git clone

Then, create a new virtualenv.  (and if you don't use virtualenv, get with the program =)
   virtualenv venv/kcl 
   source venv/kcl/bin/activate

Next, get IPython Notebook setup.  Install ipython notebook, by running the following commands from the directory containing the cloned repo:

   pip install "ipython[notebook]"
   ipython notebook

That should open a new tab in your browser.  Start a new notebook by clicking "New->Python 2".   A notebook comprises one or more cells.  Each cell has a type.   For this exercise, we'll use "Code" cells.   We'll take advantage of "magic" functions available in IPython Notebook.  We'll use run, which will execute a command.  Specifically, need execute the python setup for KCL.

The amazon-kinesis-client-python library actually rides on top of a Java process, and uses MultiLangDaemon for interprocess communication.  Thus, to get KCL setup you need to download the jars.  You can do that in the notebook by entering and executing the following in the first cell:

   run download_jars

Next, you need to install those jars with:

   run install

That command installed a few libraries for python (specifically -- it installed boto, which is the aws library for python).  Because the notebook doesn't dynamically reload, you'll need to restart IPython Notebook and re-open the notebook. (hmmm... Is there a way to reload the virtualenv used by the notebook w/o restarting?)

Once, reloaded you are ready to start emitting events into a kinesis stream.  For now, we'll use the sample application in the KCL repo.  Create a new cell and run:

   run samples/ --stream words -w cat -w dog -w bird -w lobster

From this, you should see:

Connecting to stream: words in us-east-1
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words

Woo hoo! We are emitting events to Kinesis!  Now, we need to consume them. =)

To get that going, you will want to edit  This file is loaded by KCL to configure the consumer.   Most importantly, have a look at the executableName property in that file.  This sets the name of the python code that the KCL will execute when it receives records.  In the sample, this is  

Have a look at that sample Python code.  You'll notice there are four important methods that you will need to implement:  init, process_records, checkpoint, and shutdown.  The purpose of those methods is almost self-evident, but the documentation in the sample is quite good.  Have a read through there.

Also in the, notice the AWSCredentialsProvider.  Since it is using Java underneath the covers, you need to set the Java classname of the credentials provider.  If left unchanged, it will use: DefaultAWSCredentialsProviderChain.

It is worth looking at that documentation, but probably the easiest route to get the authentication working is to create a ~/.aws/credentials file that contains the following:


Once you have your credentials setup, you can crank up the consumer by executing the following command in another notebook cell:

   run samples/ --print_command --java /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/bin/java --properties samples/

Note that the --java parameter needs the location of the java executable.  Now, running that in the notebook will give you the command line that you can go ahead and execute to consume the stream.   Go ahead and cut-and-paste that into a command-line and execute it.  At this point you are up and running.

To get started with your own application, simply replace the code with your own, and update the properties file, and you should be off and running.

Now, I find this bit hokey.  There is no native python consumer (yet!).  And fact you actually need to run *java*, which will turn around and call python.  Honestly, with this approach, you aren't getting much from the python library over what you would get from the Java library.  (except for some pipe manipulation in MultiLangDaemon)  Sure, it is a micro-services approach... but maybe it would be better to simply run a little python web service (behind a load balancer?).   Then, we'd be able to scale the consumers better without re-sharding the Kinesis stream.  (One shard/KCL worker could fan-out work to many python consumers across many boxes)  You would need to be careful about checkpointing, but it might be a bit more flexible.  (and would completely decouple the "data science" components from Kinesis)  Definitely food for thought. =)

As always, let me know if you run into trouble.

Friday, September 25, 2015

Druid : Vagrant Up (and Tranquility!)

We've been shown the light.  

After a great meeting with Fangjin and Giam, we are headed down the Tranquility route for real-time ingestion from Kinesis to Druid.  For me, that means getting comfortable with the expanded set of node types in the Druid cluster.

There is no better way to get comfortable with the cluster and the new node types, than to spin up a complete environment and kick the tires a bit.  And there is no better way to do that than "vagrant up"!

So, this morning I set out to create a complete environment suitable for Tranquility, inclusive of middle managers (which are typically omitted from a standalone "simple" cluster).  I started with Quantily's vagrant config for Druid 0.6.160, forked the repo, and went to work.

If you are impatient, you can find my fork here and get going.  If you have the patience...

It's important to understand the anatomy of Druid cluster.  First off, Druid relies on Zookeeper and MySQL.  The script installs vanilla versions of these, and creates a druid database and user in MySQL.  (Druid itself populates the schema at startup.)

The following is a list of the server types in a Druid cluster.  On our vagrant server, each of the servers occupies a different port, and has its own configuration. (also detailed below)

Overlord: (port 8080)

The overlord is responsible for task management on the Druid cluster.  Since Tranquility is largely an orchestration engine, doling out tasks to create segments as they form in real-time, the overlord is the entry point into the Druid cluster for Tranquility. 

Coordinator: (port 8081)

The coordinator is responsible for segment management, telling nodes to load/drop segments.

Broker: (port 8090)

The Broker is the query proxy in the system.  It receives the query, knows which nodes have which segments, and proxies the query to the respective nodes.

Historical: (port 8082)

Historical nodes are the beasts that load the segments and respond to queries for "static" data.  Once a segment has been committed to deep storage, it is loaded by a historical node, which responds to queries.

MiddleManager: (port 8100)

MiddleManagers are exactly that. =)  They push tasks to Peons, which they spawn on the same node.  Right now, one Peon works on one task, which is produces one segment.  (that may change) 

Special Notes about Real-Time nodes:
Per my previous blog, we were debating the use of a real-time (RT) node.   But with Tranquility, you don't need RT nodes.  They are replaced with transient peon instances that run a temporary firehose ingesting the events for that segment directly from the Tranquility client.  

Druid is targeting highly-available, fully-replicated, transactional real-time ingestion.  Since RT nodes share nothing.  They are unable to coordinate and unable to replicate data, which means they don't fit well into the strategic vision for the project.  Tranquility, and its coordinated ingestion model, may eventually obsolesce/replace RT nodes entirely. (See #1642 for more information)

Getting Started

OK -- down to business.

To fire up your own cluster, simply clone the repository and "vagrant up".  Once things are up and running, you can:

Hit the Overlord at:

Hit the Coordinator at:

In my next blog, I'll detail the client-side of integrating Tranquility using the direct Finagle API.