Tag Archives: java

Evaluation: Apache Spark

These are my notes from a presentation done for class. A link to sample code can be found near the end.

I used the instructions from Learning Spark (see references) to load a Standalone Cluster Manager with 3 nodes. I loaded oracles java 1.8 on each machine and set the JAVA_HOME variable, both from the command line and in the .bash_profile file. I copied the CDH4 precompiled version to each machine. I modified the /etc/hosts file to include information on each machine:

127.0.0.1       localhost       localhost.localdomain
 172.16.3.153    sparkserver001
 172.16.3.185    sparkserver002
 172.16.3.187    sparkserver003

I set up ssh ‘authorized_hosts’ between the master and the two workers.

The CDH4 version, on the clients, complained about not finding native Hadoop libraries, so I tried compiling the generic version of Spark. Saw the POM file and assumed Maven was to be used. Forgot about the ‘sbt’ discussed in the Section in class. Maven ran forever and never seemed to finish compilation.  Rerunning compilation with ‘sbt’.

Found an easier way to install a cluster in Learning Spark (see references). Following the instructions in the Running a Cluster chapter, I did the following:

  1. Set up a ‘hadoop’ user through which to run everything
  2. Generated ssh keys for each server. Copied the keys across servers and added them to the ‘authorized_keys’ file.
  3. Edited the conf/slaves file on the master node and added the URLs for all the nodes to it:
[root@SparkServer001 ~]# cat /opt/spark/conf/slaves
 # A Spark Worker will be started on each of the machines listed below.
 localhost
 sparkserver002
 sparkserver003

I tested the setup by running a modified version of the samples script from  Learning Java. It was modified to remove the extra libraries that were used, in order to simplify this process a bit. Here’s the modified script I used:

#!/usr/bin/env bash
# This script is used to run all of the examples. Mostly to be used by travis for testing
# Output the commands we run
set -x
# If any command fails, fail
set -e
# Build everything
KAFKA_ROOT=./kafka_2.9.2-0.8.1.1
SPARK_HOME=/opt/spark
SPARK_SUBMIT_SCRIPT="$SPARK_HOME/bin/spark-submit"
ASSEMBLY_JAR=./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar
# Mini cleanup - note: this fails the script if the directories don't exist. Just manually erase the ones that do
#hadoop fs -rm -R /tmp/py; hadoop fs -mkdir /tmp/py
#hadoop fs -rm -R /tmp/java; hadoop fs -mkdir /tmp/java
#hadoop fs -rm -R /tmp/scala; hadoop fs -mkdir /tmp/scala

# Scala
echo "Running Scala programs"
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.LoadJsonWithSparkSQL $ASSEMBLY_JAR local ./files/pandainfo.json
TWITTER_DATA=./files/testweet.json
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.SparkSQLTwitter  $ASSEMBLY_JAR  "$TWITTER_DATA"  /tmp/scala/tweetout
echo "Running Scala streaming program"
./bin/fakelogs.sh &
sleep 1
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.StreamingLogInput $ASSEMBLY_JAR local[4]

# Python
echo "Running Python programs"

$SPARK_SUBMIT_SCRIPT ./src/python/AvgMapPartitions.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicAvg.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicFilterMap.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicKeyValueMapFilter.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicMapPartitions.py local
$SPARK_SUBMIT_SCRIPT ./src/python/BasicMap.py local
$SPARK_SUBMIT_SCRIPT ./src/python/SparkSQLTwitter.py ./files/testweet.json /tmp/py/tweetout
$SPARK_SUBMIT_SCRIPT ./src/python/LoadCsv.py local ./files/favourite_animals.csv /tmp/py/panda_lovers.csv
$SPARK_SUBMIT_SCRIPT ./src/python/MakeHiveTable.py local ./files/int_string.csv pandaplural
$SPARK_SUBMIT_SCRIPT ./src/python/LoadJson.py local ./files/pandainfo.json /tmp/py/loadjsonout
$SPARK_SUBMIT_SCRIPT ./src/python/PerKeyAvg.py local
$SPARK_SUBMIT_SCRIPT ./src/python/RemoveOutliers.py local
$SPARK_SUBMIT_SCRIPT ./src/python/WordCount.py local
$SPARK_SUBMIT_SCRIPT ./src/python/MakeParquetFile.py local ./files/favourite_animals.csv /tmp/py/favouriteanimal_parquet
$SPARK_SUBMIT_SCRIPT ./src/python/QueryParquetFile.py local /tmp/py/favouriteanimal_parquet
# Java
echo "Running Java programs"
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.LoadJsonWithSparkSQL $ASSEMBLY_JAR local ./files/pandainfo.json
./sbt/sbt assembly && $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.SparkSQLTwitter  $ASSEMBLY_JAR  ./files/testweet.json  /tmp/java/tweetout

echo "Running Java streaming program"
./bin/fakelogs.sh &
sleep 1
$SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.StreamingLogInput $ASSEMBLY_JAR local[4]
sleep 5

echo "Done running all programs :)"

This ran as expected (truncated, since the output is quite long. Note, however, that the script halts if there are any problems along the way):

+ set -e
+ KAFKA_ROOT=./kafka_2.9.2-0.8.1.1
+ SPARK_HOME=/opt/spark
+ SPARK_SUBMIT_SCRIPT=/opt/spark/bin/spark-submit
+ ASSEMBLY_JAR=./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar
+ echo 'Running Scala programs'
Running Scala programs
+ /opt/spark/bin/spark-submit --class com.oreilly.learningsparkexamples.scala.LoadJsonWithSparkSQL ./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar local ./files/pandainfo.json

………..

+ echo 'Done running all programs :)'
Done running all programs :)
[hadoop@SparkServer001 learning-spark-examples]$ hadoop fs -ls /tmp
Found 2 items
drwxr-xr-x   - hadoop supergroup      	0 2015-05-10 10:05 /tmp/hive-hadoop
drwxr-xr-x   - hadoop supergroup      	0 2015-05-10 10:06 /tmp/py

I tried to use Yarn with Spark, but I kept running into errors that I didn’t have time to debug. For example:

[hadoop@SparkServer001 learning-spark-examples]$ /opt/spark/bin/spark-submit --class com.oreilly.learningsparkexamples.java.LoadJsonWithSparkSQL ./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar yarn-client files/pandainfo.json
15/05/10 10:32:28 INFO spark.SparkContext: Running Spark version 1.3.1
15/05/10 10:32:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/05/10 10:32:28 INFO spark.SecurityManager: Changing view acls to: hadoop
15/05/10 10:32:28 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/05/10 10:32:28 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/05/10 10:32:29 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/10 10:32:29 INFO Remoting: Starting remoting
15/05/10 10:32:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@sparkserver001:52966]
15/05/10 10:32:29 INFO util.Utils: Successfully started service 'sparkDriver' on port 52966.
15/05/10 10:32:29 INFO spark.SparkEnv: Registering MapOutputTracker
15/05/10 10:32:29 INFO spark.SparkEnv: Registering BlockManagerMaster
15/05/10 10:32:29 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-ee96c812-731e-476b-99e4-286ef69f6869/blockmgr-c1b224fa-8426-4ac6-9075-9e924d0edee0
15/05/10 10:32:29 INFO storage.MemoryStore: MemoryStore started with capacity 265.1 MB
15/05/10 10:32:30 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-6124584e-0e03-44ee-8b37-fd9f034b871b/httpd-bec65866-4f72-4291-b35d-edbd818f01b3
15/05/10 10:32:30 INFO spark.HttpServer: Starting HTTP Server
15/05/10 10:32:30 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/10 10:32:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:51934
15/05/10 10:32:30 INFO util.Utils: Successfully started service 'HTTP file server' on port 51934.
15/05/10 10:32:30 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/05/10 10:32:30 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/05/10 10:32:30 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/05/10 10:32:30 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/05/10 10:32:30 INFO ui.SparkUI: Started SparkUI at http://sparkserver001:4040
15/05/10 10:32:30 INFO spark.SparkContext: Added JAR file:/home/hadoop/learning-spark-examples/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar at http://172.16.3.153:51934/jars/learning-spark-examples-assembly-0.0.1.jar with timestamp 1431279150702
15/05/10 10:32:30 INFO client.RMProxy: Connecting to ResourceManager at SparkServer001/172.16.3.153:8032
15/05/10 10:32:31 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
15/05/10 10:32:31 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
15/05/10 10:32:31 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
15/05/10 10:32:31 INFO yarn.Client: Setting up container launch context for our AM
15/05/10 10:32:31 INFO yarn.Client: Preparing resources for our AM container
15/05/10 10:32:31 INFO yarn.Client: Uploading resource file:/opt/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar -> hdfs://sparkserver001:9000/user/hadoop/.sparkStaging/application_1430856908764_0013/spark-assembly-1.3.1-hadoop2.6.0.jar
15/05/10 10:32:32 INFO yarn.Client: Setting up the launch environment for our AM container
15/05/10 10:32:32 INFO spark.SecurityManager: Changing view acls to: hadoop
15/05/10 10:32:32 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/05/10 10:32:32 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/05/10 10:32:32 INFO yarn.Client: Submitting application 13 to ResourceManager
15/05/10 10:32:32 INFO impl.YarnClientImpl: Submitted application application_1430856908764_0013
15/05/10 10:32:33 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
15/05/10 10:32:33 INFO yarn.Client:
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1431279152748
     final status: UNDEFINED
     tracking URL: http://sparkserver001:8088/proxy/application_1430856908764_0013/
     user: hadoop
15/05/10 10:32:34 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
15/05/10 10:32:35 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
15/05/10 10:32:36 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
15/05/10 10:32:37 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@sparkserver001:42980/user/YarnAM#-1479048841]
15/05/10 10:32:37 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> SparkServer001, PROXY_URI_BASES -> http://SparkServer001:8088/proxy/application_1430856908764_0013), /proxy/application_1430856908764_0013
15/05/10 10:32:37 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/05/10 10:32:37 INFO yarn.Client: Application report for application_1430856908764_0013 (state: RUNNING)
15/05/10 10:32:37 INFO yarn.Client:
     client token: N/A
     diagnostics: N/A
     ApplicationMaster host: sparkserver001
     ApplicationMaster RPC port: 0
     queue: default
     start time: 1431279152748
     final status: UNDEFINED
     tracking URL: http://sparkserver001:8088/proxy/application_1430856908764_0013/
     user: hadoop
15/05/10 10:32:37 INFO cluster.YarnClientSchedulerBackend: Application application_1430856908764_0013 has started running.
15/05/10 10:32:37 INFO netty.NettyBlockTransferService: Server created on 52776
15/05/10 10:32:37 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/05/10 10:32:37 INFO storage.BlockManagerMasterActor: Registering block manager sparkserver001:52776 with 265.1 MB RAM, BlockManagerId(, sparkserver001, 52776)
15/05/10 10:32:37 INFO storage.BlockManagerMaster: Registered BlockManager
15/05/10 10:32:39 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@sparkserver001:42980] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/05/10 10:32:42 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@sparkserver001:48603/user/YarnAM#-413459596]
15/05/10 10:32:42 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> SparkServer001, PROXY_URI_BASES -> http://SparkServer001:8088/proxy/application_1430856908764_0013), /proxy/application_1430856908764_0013
15/05/10 10:32:42 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
15/05/10 10:32:45 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@sparkserver001:48603] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
15/05/10 10:32:45 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
15/05/10 10:32:45 INFO ui.SparkUI: Stopped Spark web UI at http://sparkserver001:4040
15/05/10 10:32:45 INFO scheduler.DAGScheduler: Stopping DAGScheduler
15/05/10 10:32:45 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
15/05/10 10:32:45 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
15/05/10 10:32:45 INFO cluster.YarnClientSchedulerBackend: Stopped
15/05/10 10:32:45 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
15/05/10 10:32:46 INFO storage.MemoryStore: MemoryStore cleared
15/05/10 10:32:46 INFO storage.BlockManager: BlockManager stopped
15/05/10 10:32:46 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
15/05/10 10:32:46 INFO spark.SparkContext: Successfully stopped SparkContext
15/05/10 10:32:46 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped!
15/05/10 10:32:46 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/05/10 10:32:46 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/05/10 10:32:46 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
15/05/10 10:33:00 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
Exception in thread "main" java.lang.NullPointerException
    at org.apache.spark.SparkContext.(SparkContext.scala:544)
    at org.apache.spark.SparkContext.(SparkContext.scala:154)
    at org.apache.spark.SparkContext.(SparkContext.scala:169)
    at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:67)
    at com.oreilly.learningsparkexamples.java.LoadJsonWithSparkSQL.main(LoadJsonWithSparkSQL.java:34)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[hadoop@SparkServer001 learning-spark-examples]$

Found that this issue was due to problems in the config file on one of the nodes. Fixed that and got passed that error.

Wrote the sample code that matches the purpose for each of the other technologies. But this version doesn’t like the URLs I’m using:

java.net.UnknownHostException: http://services.faa.gov/airport/status/BWI?format=application/json
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at java.net.Socket.connect(Socket.java:538)
    at java.net.Socket.(Socket.java:434)
    at java.net.Socket.(Socket.java:211)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)
15/05/10 12:52:06 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiv

I wrote a quick python server script that would actually read the incoming data and pass it on a local port, based on something I found here: http://www.binarytides.com/python-socket-server-code-example

My version:

'''
	Simple socket server using threads
'''
 
import socket
import sys
import urllib2
from thread import *
 
HOST = ''   # Symbolic name meaning all available interfaces
PORT = 8854 # Arbitrary non-privileged port

count = 0;
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print 'Socket created'
 
#Bind socket to local host and port
try:
	s.bind((HOST, PORT))
except socket.error as msg:
	print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
	sys.exit()
	 
print 'Socket bind complete'
 
#Start listening on socket
s.listen(10)
print 'Socket now listening'
 
#Function for handling connections. This will be used to create threads
def clientthread(conn):
    global count
    airportCodes = ['SFO','BWI','MIA','ORD','ATL','LAX','BOS','STL','DFW','DEN','SEA','CLT','JFK']
    	 
	#Get next string
    nextString = "http://services.faa.gov/airport/status/" + airportCodes[count] + "?format=application/json"
    print nextString
    response = urllib2.urlopen(nextString)
    reply = response.read()
	 
    conn.sendall(reply)
    count += 1
    if count == len(airportCodes):
   	 count = 0
	 
	#came out of loop
    conn.close()
 
#now keep talking with the client
while 1:
	#wait to accept a connection - blocking call
	conn, addr = s.accept()
	print 'Connected with ' + addr[0] + ':' + str(addr[1])
	 
	#start new thread takes 1st argument as a function name to be run, second is the tuple of arguments to the function.
	start_new_thread(clientthread ,(conn,))
 
s.close()

That seemed to do the trick. I’m now ingesting data:

Server side (sample) –

http://services.faa.gov/airport/status/DFW?format=application/json
Connected with 127.0.0.1:33389
http://services.faa.gov/airport/status/DEN?format=application/json
Connected with 127.0.0.1:33391
http://services.faa.gov/airport/status/SEA?format=application/json
Connected with 127.0.0.1:33393
http://services.faa.gov/airport/status/CLT?format=application/json
Connected with 127.0.0.1:33395
http://services.faa.gov/airport/status/JFK?format=application/json
Connected with 127.0.0.1:33397
http://services.faa.gov/airport/status/SFO?format=application/json
Connected with 127.0.0.1:33399
http://services.faa.gov/airport/status/BWI?format=application/json
Connected with 127.0.0.1:33402
http://services.faa.gov/airport/status/MIA?format=application/json
Connected with 127.0.0.1:33404
http://services.faa.gov/airport/status/ORD?format=application/json
Connected with 127.0.0.1:33406
http://services.faa.gov/airport/status/ATL?format=application/json
Connected with 127.0.0.1:33408
http://services.faa.gov/airport/status/LAX?format=application/json
Connected with 127.0.0.1:33410
http://services.faa.gov/airport/status/BOS?format=application/json
Connected with 127.0.0.1:33412
http://services.faa.gov/airport/status/STL?format=application/json
Connected with 127.0.0.1:33414
http://services.faa.gov/airport/status/DFW?format=application/json
Connected with 127.0.0.1:33416
http://services.faa.gov/airport/status/DEN?format=application/json
Connected with 127.0.0.1:33418
http://services.faa.gov/airport/status/SEA?format=application/json
Connected with 127.0.0.1:33420
http://services.faa.gov/airport/status/CLT?format=application/json
Connected with 127.0.0.1:33422
http://services.faa.gov/airport/status/JFK?format=application/json
Connected with 127.0.0.1:33424
http://services.faa.gov/airport/status/SFO?format=application/json
Connected with 127.0.0.1:33426
http://services.faa.gov/airport/status/BWI?format=application/json
Connected with 127.0.0.1:33428
http://services.faa.gov/airport/status/MIA?format=application/json
Connected with 127.0.0.1:33430
http://services.faa.gov/airport/status/ORD?format=application/json
Connected with 127.0.0.1:33432
http://services.faa.gov/airport/status/ATL?format=application/json
Connected with 127.0.0.1:33434
http://services.faa.gov/airport/status/LAX?format=application/json
Connected with 127.0.0.1:33436
http://services.faa.gov/airport/status/BOS?format=application/json
Connected with 127.0.0.1:33438
http://services.faa.gov/airport/status/STL?format=application/json
Connected with 127.0.0.1:33440
http://services.faa.gov/airport/status/DFW?format=application/json
Connected with 127.0.0.1:33442
http://services.faa.gov/airport/status/DEN?format=application/json
Connected with 127.0.0.1:33444
http://services.faa.gov/airport/status/SEA?format=application/json
Connected with 127.0.0.1:33446
http://services.faa.gov/airport/status/CLT?format=application/json
Connected with 127.0.0.1:33448
http://services.faa.gov/airport/status/JFK?format=application/json
Connected with 127.0.0.1:33450
http://services.faa.gov/airport/status/SFO?format=application/json
Connected with 127.0.0.1:33452
http://services.faa.gov/airport/status/BWI?format=application/json
Connected with 127.0.0.1:33454
http://services.faa.gov/airport/status/MIA?format=application/json
Connected with 127.0.0.1:33457
http://services.faa.gov/airport/status/ORD?format=application/json
Connected with 127.0.0.1:33459
http://services.faa.gov/airport/status/ATL?format=application/json
Connected with 127.0.0.1:33461
http://services.faa.gov/airport/status/LAX?format=application/json
Connected with 127.0.0.1:33463
http://services.faa.gov/airport/status/BOS?format=application/json
Connected with 127.0.0.1:33465
http://services.faa.gov/airport/status/STL?format=application/json
Connected with 127.0.0.1:33467
http://services.faa.gov/airport/status/DFW?format=application/json
Connected with 127.0.0.1:33469
http://services.faa.gov/airport/status/DEN?format=application/json
Connected with 127.0.0.1:33471
http://services.faa.gov/airport/status/SEA?format=application/json
Connected with 127.0.0.1:33473
http://services.faa.gov/airport/status/CLT?format=application/json

Client side (sample):

15/05/10 14:23:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 39.0 (TID 39) in 3 ms on localhost (1/1)
15/05/10 14:23:09 INFO scheduler.DAGScheduler: Stage 39 (print at FaaFlightDelayStream.java:125) finished in 0.003 s
15/05/10 14:23:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 39.0, whose tasks have all completed, from pool
15/05/10 14:23:09 INFO scheduler.DAGScheduler: Job 39 finished: print at FaaFlightDelayStream.java:125, took 0.009611 s
-------------------------------------------
Time: 1431292989000 ms
-------------------------------------------
edu.hu.faa.spark.AirportStatus@3e6c76b2

15/05/10 14:23:09 INFO scheduler.JobScheduler: Finished job streaming job 1431292989000 ms.0 from job set of time 1431292989000 ms
15/05/10 14:23:09 INFO scheduler.JobScheduler: Total delay: 0.015 s for time 1431292989000 ms (execution: 0.011 s)
15/05/10 14:23:09 INFO rdd.MapPartitionsRDD: Removing RDD 190 from persistence list
15/05/10 14:23:09 INFO storage.BlockManager: Removing RDD 190
15/05/10 14:23:09 INFO rdd.BlockRDD: Removing RDD 189 from persistence list
15/05/10 14:23:09 INFO storage.BlockManager: Removing RDD 189
15/05/10 14:23:09 INFO dstream.SocketInputDStream: Removing blocks of RDD BlockRDD[189] at socketTextStream at FaaFlightDelayStream.java:67 of time 1431292989000 ms
15/05/10 14:23:09 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1431292987000 ms)
-------------------------------------------15/05/10 14:23:10 INFO scheduler.JobScheduler: Starting job streaming job 1431292990000 ms.0 from job set of time 1431292990000 ms

Time: 1431292990000 ms
-------------------------------------------
15/05/10 14:23:10 INFO scheduler.JobScheduler: Added jobs for time 1431292990000 ms

15/05/10 14:23:10 INFO scheduler.JobScheduler: Finished job streaming job 1431292990000 ms.0 from job set of time 1431292990000 ms
15/05/10 14:23:10 INFO scheduler.JobScheduler: Total delay: 0.004 s for time 1431292990000 ms (execution: 0.000 s)
15/05/10 14:23:10 INFO rdd.MapPartitionsRDD: Removing RDD 192 from persistence list
15/05/10 14:23:10 INFO storage.BlockManager: Removing RDD 192
15/05/10 14:23:10 INFO rdd.BlockRDD: Removing RDD 191 from persistence list
15/05/10 14:23:10 INFO storage.BlockManager: Removing RDD 191
15/05/10 14:23:10 INFO dstream.SocketInputDStream: Removing blocks of RDD BlockRDD[191] at socketTextStream at FaaFlightDelayStream.java:67 of time 1431292990000 ms
15/05/10 14:23:10 INFO storage.BlockManager: Removing block input-0-1431292988000
15/05/10 14:23:10 INFO storage.MemoryStore: Block input-0-1431292988000 of size 510 dropped from memory (free 277876600)
15/05/10 14:23:10 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1431292988000 ms)
15/05/10 14:23:10 INFO storage.BlockManagerInfo: Removed input-0-1431292988000 on localhost:53643 in memory (size: 510.0 B, free: 265.1 MB)
15/05/10 14:23:10 INFO storage.BlockManagerMaster: Updated info of block input-0-1431292988000
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Starting receiver again
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Starting receiver
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
15/05/10 14:23:10 INFO dstream.SocketReceiver: Connecting to localhost:8854
15/05/10 14:23:10 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Receiver started again
15/05/10 14:23:10 INFO dstream.SocketReceiver: Connected to localhost:8854
15/05/10 14:23:10 INFO dstream.SocketReceiver: Stopped receiving
15/05/10 14:23:10 INFO dstream.SocketReceiver: Closed socket to localhost:8854
15/05/10 14:23:10 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to localhost:8854
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to localhost:8854:
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
15/05/10 14:23:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost:8854
15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0
15/05/10 14:23:10 INFO storage.MemoryStore: ensureFreeSpace(517) called with curMem=142840, maxMem=278019440
15/05/10 14:23:10 INFO storage.MemoryStore: Block input-0-1431292990400 stored as bytes in memory (estimated size 517.0 B, free 265.0 MB)
15/05/10 14:23:10 INFO storage.BlockManagerInfo: Added input-0-1431292990400 in memory on localhost:53643 (size: 517.0 B, free: 265.1 MB)
15/05/10 14:23:10 INFO storage.BlockManagerMaster: Updated info of block input-0-1431292990400
15/05/10 14:23:10 INFO receiver.BlockGenerator: Pushed block input-0-1431292990400
15/05/10 14:23:11 INFO scheduler.JobScheduler: Added jobs for time 1431292991000 ms
15/05/10 14:23:11 INFO scheduler.JobScheduler: Starting job streaming job 1431292991000 ms.0 from job set of time 1431292991000 ms
15/05/10 14:23:11 INFO spark.SparkContext: Starting job: print at FaaFlightDelayStream.java:125
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Got job 40 (print at FaaFlightDelayStream.java:125) with 1 output partitions (allowLocal=true)
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Final stage: Stage 40(print at FaaFlightDelayStream.java:125)
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Missing parents: List()
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Submitting Stage 40 (MapPartitionsRDD[196] at map at FaaFlightDelayStream.java:71), which has no missing parents
15/05/10 14:23:11 INFO storage.MemoryStore: ensureFreeSpace(2024) called with curMem=143357, maxMem=278019440
15/05/10 14:23:11 INFO storage.MemoryStore: Block broadcast_40 stored as values in memory (estimated size 2024.0 B, free 265.0 MB)
15/05/10 14:23:11 INFO storage.MemoryStore: ensureFreeSpace(1472) called with curMem=145381, maxMem=278019440
15/05/10 14:23:11 INFO storage.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 1472.0 B, free 265.0 MB)
15/05/10 14:23:11 INFO storage.BlockManagerInfo: Added broadcast_40_piece0 in memory on localhost:53643 (size: 1472.0 B, free: 265.1 MB)
15/05/10 14:23:11 INFO storage.BlockManagerMaster: Updated info of block broadcast_40_piece0
15/05/10 14:23:11 INFO spark.SparkContext: Created broadcast 40 from broadcast at DAGScheduler.scala:839
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 40 (MapPartitionsRDD[196] at map at FaaFlightDelayStream.java:71)
15/05/10 14:23:11 INFO scheduler.TaskSchedulerImpl: Adding task set 40.0 with 1 tasks
15/05/10 14:23:11 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 40.0 (TID 40, localhost, NODE_LOCAL, 1307 bytes)
15/05/10 14:23:11 INFO executor.Executor: Running task 0.0 in stage 40.0 (TID 40)
15/05/10 14:23:11 INFO storage.BlockManager: Found block input-0-1431292990400 locally
15/05/10 14:23:11 INFO spark.FaaFlightDelayStream: Record: {"delay":"false","IATA":"CLT","state":"North Carolina","name":"Charlotte Douglas International","weather":{"visibility":10.00,"weather":"Mostly Cloudy","meta":{"credit":"NOAA's National Weather Service","updated":"4:52 PM Local","url":"http://weather.gov/"},"temp":"79.0 F (26.1 C)","wind":"North at 12.7mph"},"ICAO":"KCLT","city":"Charlotte","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}}
15/05/10 14:23:11 INFO executor.Executor: Finished task 0.0 in stage 40.0 (TID 40). 937 bytes result sent to driver
15/05/10 14:23:11 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 40.0 (TID 40) in 4 ms on localhost (1/1)
15/05/10 14:23:11 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 40.0, whose tasks have all completed, from pool
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Stage 40 (print at FaaFlightDelayStream.java:125) finished in 0.004 s
15/05/10 14:23:11 INFO scheduler.DAGScheduler: Job 40 finished: print at FaaFlightDelayStream.java:125, took 0.011392 s
-------------------------------------------
Time: 1431292991000 ms
-------------------------------------------
edu.hu.faa.spark.AirportStatus@6f857bb2

15/05/10 14:23:11 INFO scheduler.JobScheduler: Finished job streaming job 1431292991000 ms.0 from job set of time 1431292991000 ms
15/05/10 14:23:11 INFO scheduler.JobScheduler: Total delay: 0.016 s for time 1431292991000 ms (execution: 0.012 s)
15/05/10 14:23:11 INFO rdd.MapPartitionsRDD: Removing RDD 194 from persistence list
15/05/10 14:23:11 INFO storage.BlockManager: Removing RDD 194
15/05/10 14:23:11 INFO rdd.BlockRDD: Removing RDD 193 from persistence list
15/05/10 14:23:11 INFO storage.BlockManager: Removing RDD 193
15/05/10 14:23:11 INFO dstream.SocketInputDStream: Removing blocks of RDD BlockRDD[193] at socketTextStream at FaaFlightDelayStream.java:67 of time 1431292991000 ms
15/05/10 14:23:11 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1431292989000 ms)

Added a client portion that would read in the data and tally how many times the airport reported delays over an hour’s period. Not that I’d want to keep this running for an hour. But I did want to keep the count active. Here’s the output for that portion of the console log:

-------------------------------------------
Time: 1431301008000 ms
-------------------------------------------
(ORD,8)
(ATL,0)
(STL,0)
(CLT,0)
(LAX,0)
(SFO,0)
(BWI,0)
(DEN,0)
(DFW,0)
(MIA,0)
….

The checkpoint subdirectory I created appears thus:

hadoop@SparkServer001 FaaSpark]$ hadoop fs -ls /checkpoint
Found 12 items
drwxr-xr-x   - hadoop supergroup      	0 2015-05-10 16:36 /checkpoint/10f1ba86-b3c2-49cd-8432-5b0fc338eb08
-rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301003000
-rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301004000
-rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301004000.bk
-rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301005000
-rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301005000.bk
-rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301006000
-rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301006000.bk
-rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301007000
-rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301007000.bk
-rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301008000
-rw-r--r--   3 hadoop supergroup      	0 2015-05-10 16:36 /checkpoint/temp

The content of the files is binary and I don’t remember the flag off the top of my head to convert those to text for reading.

The code I wrote can be found here:

https://gitlab.com/amracel/FaaSpark

Threading in Spring

In order to know when a user requests a tarot reading, I needed to set up an email client to send a message to me. The joys of SMTP are that sending email is very straightforward, no matter what your programming language. Java is no different. So I created code to send out the message. Problem was, the email was slow to send, slowing the web page response time down too much. I needed to create a second thread to handle it, in order to avoid the slowdown.

I didn’t take the time to read the Spring MVC model documentation first regarding threads. I just moved my email to a class that implemented Runnable and tried to be done with it. Unfortunately, the Spring MVC (or any other servlet for that matter, I’m guessing) requires that all the threads be tied up in pretty bows before proceeding. So using this method to send the email didn’t help my timing issue – it still required the email thread to finish before sending the response page to the user.

A bit more research brought me to Apache’s Active MQ. That was what I needed. It took care of all the heavy lifting for my producer/consumer pieces. I only need to write those classes and call the Active MQ libraries to put it in place.

The one issue I had with the process, after my first foray into it, was that it required a place to write to my file system. It took a bit of digging to figure out how to change the default location to one that my server would allow.

I see that there’s also a way of using my database. That will be another refactoring later.

Here’s the links you need to use this library:

ActiveMQ home page: http://activemq.apache.org

Setup Information for Spring: http://activemq.apache.org/spring-support.html
Tutorial site. A bit out of date (version 3, current version is 5), but it will help you get started:

http://javaboutique.internet.com/tutorials/activemq/