Friday, September 2, 2016

Running Spark Streaming DCOS

In this post I will show how I created a Spark Streaming Application then deployed the application on DCOS cluster.

The Spark Streaming app connects to a Kafka topic and counts messages reporting the number and rate at which it reads the messages.

Cluster
- Azure DCOS 1.7
- 3 Master, 15 private agents, 3 public agents
- Deployed Kafka (0.10.0.0), 3 brokers, named "hub"

Create an Spark Streaming Application

Scala
https://github.com/david618/rtsink/blob/master/src/main/scala/com/esri/rtsink/scala/KafkaSparkCnt.scala

or
Java
https://github.com/david618/rtsink/blob/master/src/main/java/com/esri/rtsink/KafkaSparkCnt.java

I'll stick with Scala version in this post.

The application consumes lines from Kafka topic.  The application counts the number of lines and rate at which it reads them.

The application doesn't use the advanced features of Spark so it may not panellize very nicely. Spark breaks operations up into segments and passes the work to the worker nodes. More complex tasks (e.g. word counts, parsing) would likely be better use case for Spark.

Compile the application. These are Maven projects so hopefully it will compile without too much difficulty. 

Simulator
For testing you can you the Simulator to send messages to Kafka.

https://github.com/david618/Simulator

This is a Maven project. It should also compile easily.

Run Locally 
Assuming you already have Java 1.8 installed and configured.

We are using a custom build of Spark 1.6 for Scala 2.11.  See my earlier blog post for instructions on how to create a custom build.  http://davidssysadminnotes.blogspot.com/2016/05/compiling-spark-161-for-scala-211.html

Extract the spark file (spark-1.6.1-bin-hadoop2.6_2.11.tgz) to you're home directory. For Linux tar xvzf.  I'm assuming you're working on Linux; doing this in Windows is more challenging I'm sure.

I renamed the folder (spark-1.6.1-bin-hadoop2.6_2.11) to just spark.

You'll need to have a local cluster running.  You can create one using instructions from this blog post.  http://davidssysadminnotes.blogspot.com/2016/06/dcos-on-premises-install-centos.html

My local cluster has one master (m1) and two agents (a1 and a2).  I have Kafka running on a1:9092.

Change directory to the target folder from the rtsink project.

$ /home/david/spark/bin/spark-submit --class com.esri.rtsink.scala.KafkaSparkCnt --master local[4] rtsink-jar-with-dependencies.jar a1:9092 simFile2 group3 1 15000
START
a1:9092
brokers: a1:9092


In another terminal run a Simulation.  Change to the target folder for Simulator

$ java -cp Simulator-jar-with-dependencies.jar com.esri.simulator.Kafka
Usage: Kafka <broker-list-or-hub-name> <topic> <file> <rate> <numrecords> (<burst-delay-ms>)

$ java -cp Simulator-jar-with-dependencies.jar com.esri.simulator.Kafka a1:9092 simFile2 ../simFile_1000_10s.json 100 1000

From the rtsink terminal you should she the results.

Time 1472836311000 ms: Count 89 total records counted)

Time 1472836312000 ms: Count 100 total records counted)

Time 1472836313000 ms: Count 99 total records counted)

Time 1472836314000 ms: Count 100 total records counted)

Time 1472836315000 ms: Count 100 total records counted)

Time 1472836316000 ms: Count 100 total records counted)

Time 1472836317000 ms: Count 100 total records counted)

Time 1472836318000 ms: Count 100 total records counted)

Time 1472836319000 ms: Count 100 total records counted)

Time 1472836320000 ms: Count 100 total records counted)

Time 1472836321000 ms: Count 12 total records counted)
Total Records Counted in test was 1000 at a rate of 99.98000399920016


Ctrl-C to end the program

You can also query the webport for details.

Health Check
$ curl localhost:15000;echo
{"healthy":true}

Get Results
$ curl -s localhost:15000/count/ | jq "."
{
  "tm": 1472836322008,
  "rates": [
    99.98000399920016
  ],
  "counts": [
    1000
  ],
  "latencies": []
}

You may need to install jq (# apt-get install jq)


NOTE: To run/debug from IDE you can uncomment the line in the code for .setMaster("local[4]").  Don't forget to comment out before create build for deployment to cluster.

Run on DCOS/MESOS
You'll need to configure spark and make it available to you're Marathon app and also for the Spark worker nodes.

Inside of the spark conf

Make a copy of spark-env.sh.template

$ cp spark-env.sh.template  spark-env.sh

Edit the file and add a line.
JAVA_HOME=/opt/mesosphere/active/java/usr/java

This worked for Azure cluster and my on-premise configuration.

Make a copy of spark-env.

$ cp spark-defaults.conf.template spark-defaults.conf

Edit the file and add a line.
JAVA_HOME=/opt/mesosphere/active/java/usr/java

This worked for Azure cluster and my on-premise configuration.
spark.executor.uri http://m1/apps/spark.tgz

For on premise I used my hostname. For Azure I used the IP of one my master servers.

Create tgz file
$ tar cvzf spark.tgz spark/*

Move the spark.tgz to the server that will act as you're web server.  My master on the single node on-premise install or one of the masters in cluster will work.

You can use DCOS web server to host the app.

# mkdir /opt/mesosphere/active/dcos-ui/user/apps/
# cp spark.tgz /opt/mesosphere/active/dcos-ui/user/apps/

From your nodes you should be able to access the file.

For example:
$ curl -O http://m1/apps/spark.tgz
Should retrieve the file.

You'll also need to move rtsink-jar-with-dependencies.jar and copy as you did for spark.tgz.

For example:
$ curl -O http://m1/apps/rtsink-jar-with-dependencies.jar
Should retrieve the file.

Now we'll create the Marathon App.

Click on Create Application.

Click on JSON Mode.


Then cut and paste this JSON.

{
  "id": "/kafka-spark-cnt",
  "cmd": "$MESOS_SANDBOX/spark/bin/spark-submit --class com.esri.rtsink.scala.KafkaSparkCnt --conf spark.cores.max=2 --master mesos://leader.mesos:5050 $MESOS_SANDBOX/rtsink-jar-with-dependencies.jar a1:9092 simFile2 group2 1 $PORT0",
  "cpus": 1,
  "mem": 1024,
  "disk": 0,
  "instances": 0,
  "constraints": [
    [
      "hostname",
      "UNIQUE"
    ]
  ],
  "healthChecks": [
    {
      "path": "/",
      "protocol": "HTTP",
      "portIndex": 0,
      "gracePeriodSeconds": 300,
      "intervalSeconds": 60,
      "timeoutSeconds": 20,
      "maxConsecutiveFailures": 3,
      "ignoreHttp1xx": false
    }
  ],
  "uris": [
    "http://m1/apps/spark.tgz",
    "http://m1/apps/rtsink-jar-with-dependencies.jar"
  ]
}




Click Create Application

Scale the application to 1 instance.

On the Mesos page in Active Task you should see the kafka-spark-cnt task start then it will spawn a worker node ID "0".


You can verify the work node uses 2 CPU's as requested in the command line. 

Click on Sandbox for the kafka-spark-cnt task and open stdout.  This is the same as we saw in terminal when running locally.

Now you can run the Simulator again to see the

$ java -cp Simulator-jar-with-dependencies.jar com.esri.simulator.Kafka a1:9092 simFile2 ../simFile_1000_10s.json 100 1000

You'll see the results in the stdout page.


You can also get to the health and count pages using the Mesos DNS name.

You'll need the port number from Marathon Instance.


In this case 18039

NOTE: Install jq on centos
# yum install epel-release
# yum install jq

$ curl kafka-spark-cnt.marathon.mesos:18039/count  | jq

{
  "latencies": [],
  "counts": [
    1000
  ],
  "rates": [
    99.94003597841295
  ],
  "tm": 1472839239012
}









No comments:

Post a Comment