Docker IV: Spark for Cassandra Data Analysis

Spark in a general cluster-computing framework, and in our case we will use it to process data from the Cassandra cluster. As we saw in Part I, we cannot run any type of query on a Cassandra table. But by running a Spark worker on each host running a Cassandra node, we can efficiently read/analyse all of its data in a distributed way. Each Spark worker (slave) will read the data from its local Cassandra node and send the result back to the Spark driver (master). Spark on Cassandra Cluster

The Docker Image

Image source code: https://github.com/nicomak/[…]/spark
Image on DockerHub: https://hub.docker.com/r/nicomak/spark/

No official image exists for Spark so we need to create our own image. The image I created is very basic, it is simply a Debian with Python base image on top of which I’ve installed a Java 8 JDK. And the Spark 2.0.1 binaries are simply extracted from the original release tarball to the /app/ folder.

There is just a small subtlety for running the container. The Spark startup scripts run in background mode, so they cannot be used as the main container process, otherwise the container would die immediately. The main process of the container is the entrypoint.sh script, and to keep it alive forever I simply run tail -f the log folder. This way the logs are also written to the standard output and can be seen when running  docker logs <container>.

Another problem is port mapping. Spark opens a lot of random ports to communicate between master and slaves. This is difficult to manage with Docker, so will simply use the --net=host network mode to let the containers open any port they want on the host.

Running the Spark Containers

We will install the Spark containers on top of our existing Cassandra cluster from Part I, as illustrated in the introduction above.

To start a cluster with the master (driver) on ubuntu0 and slaves (workers) on ubuntu[1-3]:

After starting them, the master UI should show all connected workers:

Spark master UI with 3 worker containers
Spark master UI with 3 worker containers

The Cassandra Data

In part I we had 3 Cassandra nodes on which we created a table called posts. We saw that rows of this table are partitioned (or sharded) across the 3 nodes based on the primary key, which is the username. If the replication factor is 2, then each partition will have 2 copies (on separate nodes).

For this example I reused the same posts table but set a replication factor of 1. This can be updated by altering the keyspace:

There is a nodetool command in Cassandra to find out which primary key will end up in which node:

Then I used the following command to find 3 different usernames which will be sharded into separate nodes:

And I inserted new posts for these 3 users:

So the data is composed of posts:

  • By lepellenabout travelling, stored in the cass1 container on ubuntu1 machine.
  • By arnaud, about jazz music, stored in the cass2 container on ubuntu2 machine.
  • By nicolas, about programming, stored in the cass3 container on ubuntu3 machine.

In the next section we will use this data to see if the deployed workers take advantage of data locality.

Using the Cassandra Connector

First let’s run a spark shell (Scala) in the master container:

There are 3 options:

  • –master : This time we specify the master url to use it in standalone mode with all its workers.
  • –packages This downloads a given library (in this case the Datastax Cassandra connector) from https://spark-packages.org/ as well as its dependencies.
  • –conf : We specify a Cassandra node in a property which will be used by the connector to auto-discover all other nodes.

The startup output show that the connector library is downloaded from spark-packages, and then Apache Ivy takes care of downloading all its dependencies from the Maven central repository:

We can then use the input format provided by the connector to load the table into a distributed DataFrame:

Now we can perform distributed computing tasks such as a Word Count. But let’s do something more interesting and create an inverted index which maps each word in all posts to the IP-address of the worker which processed it:

Below are the results of this job. If you compare the mapping with the data described in the previous section, you can see that each worker processed the data from the Cassandra node of the same host. The connector did its job well !

Bonus: Using Python

The options are the same when starting PySpark:

Loading the table looks like this:

And finally the index generation job looks like this:

Conclusion

Using Spark we can query and process Cassandra data any way we want. Now that we have all the containers we need, let’s move on to the orchestration part and see how we can organize their deployment in a cluster.

 

2 Comments

  1. John Paynter said:

    These docs are magnificent. I understand the time and patience you had to write these.

    March 10, 2017
    Reply
    • Nicolas said:

      Thanks! Very much appreciated!

      March 11, 2017
      Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

seventeen − 5 =