Thursday, May 14, 2015

Spark SQL against Cassandra Example


Spark SQL is awesome.  It allows you to query any Resilient Distributed Dataset (RDD) using SQL.  (including data stored in Cassandra!)

First thing to do is to create a SQLContext from your SparkContext.  I'm using Java so...
(sorry -- i'm still not hip enough for Scala)

        JavaSparkContext context = new JavaSparkContext(conf);
        JavaSQLContext sqlContext = new JavaSQLContext(context);


Now you have a SQLContext, but you have no data.  Go ahead and create an RDD, just like you would in regular Spark:


        JavaPairRDD<Integer, Product> productsRDD = 
            javaFunctions(context).cassandraTable("test_keyspace", "products",
                productReader).keyBy(new Function<Product, Integer>() {
            @Override
            public Integer call(Product product) throws Exception {
                return product.getId();
            }
        });

(The example above comes from the spark-on-cassandra-quickstart project, as described in my previous post.)

Now that we have a plain vanilla RDD,  we need to spice it up with a schema, and let the sqlContext know about it.  We can do that with the following lines:

        JavaSchemaRDD schemaRDD =   sqlContext.applySchema(productsRDD.values(), Product.class);        
        sqlContext.registerRDDAsTable(schemaRDD, "products");   

Shazam.  Now your sqlContext is ready for querying.  Notice that it inferred the schema from the Java bean. (Product.class).  (Next blog post, I'll show how to do this dynamically)

You can prime the pump with a:

        System.out.println("Total Records = [" + productsRDD.count() + "]");

The count operation forces Spark to load the data into memory, which makes queries like the following lightning fast:

        JavaSchemaRDD result = sqlContext.sql("SELECT id from products WHERE price < 0.50");
        for (Row row : result.collect()){
            System.out.println(row);
        }

That's it.  Your off to the SQL races.


P.S.  If you try querying the sqlContext without applying a schema and/or without registering the RDD as a table, you may see something similar to this:

Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'id, tree:
'Project ['id]
 'Filter ('price < 0.5)
  NoRelation$

No comments: