Tag Archives: web services

Evaluation: Apache Storm

Here are the notes from the recent evaluation I did of this product. The sample code I created can be found here:

https://gitlab.com/amracel/FaaStorm

I used the instructions found on the Apache Storm site for installing a multi-node cluster. I figured that was a better way to compare the Amazon Kinesis application to the Storm.

  • I set up 3 nodes, each running CentOS 7.
  • I installed Zookeeper on each node, as per the instructions here: http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html
  • I installed the .rpm version of java on each box
  • I didn’t experiment with the heap size, as suggested. But I didn’t seem to have any problems with the default
  • Here is the configuration file I came up with, based on the zookeeper site recommendations:

    # The number of milliseconds of each tick
    tickTime=2000
    # The number of ticks that the initial
    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just
    # example sakes.
    dataDir=/var/lib/zookeeper
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    server.1=stormserver001:2888:3888
    server.2=stormserver002:2888:3888
    server.3=stormserver003:2888:3888
    
  • I also set up a ‘myid’ file for each server, as per instruction.
  • I had already installed Java in the previous step, so I didn’t need to do that. And CentOS came with Python 2.x already installed.
  • I downloaded and extracted Storm on each machine into the /usr/lib directory. I added a soft link from
    /usr/lib/apache-storm-0.9.4 to /usr/lib/storm.
  • As per the instructions, my storm.yaml file had the following configuration:
  • ########### These MUST be filled in for a storm configuration
    storm.zookeeper.servers:
    	- "stormserver001"
    	- "stormserver002"
    	- "stormserver003"
    
    storm.local.dir: "/mnt/storm"
    
    #
    nimbus.host: "stormserver001"
    
    supervisor.slots.ports:
       - 6700
       - 6701
       - 6702
    #
    
  • I tried running each command to start the servers from the command line:
  • On each box:

    java -cp /usr/lib/zookeeper/zookeeper-3.4.6.jar:/usr/lib/zookeeper/lib/log4j-1.2.16.jar:/usr/lib/zookeeper/conf:/usr/lib/zookeeper/lib/jline-0.9.94.jar:/usr/lib/zookeeper/lib/slf4j-api-1.6.1.jar org.apache.zookeeper.ZooKeeperMain -server 127.0.0.1:2181
    

    On the master box:

    /usr/lib/storm/bin/storm nimbus
    /usr/lib/storm/bin/storm ui
    

    On the supervisor boxes:

    /usr/lib/storm/bin/storm supervisor
    

    The logs from the command line showed that the boxes couldn’t see each other. I realized I hadn’t opened up the ports between the boxes. After much trial and error, I ended up with the following ports opened up:

    [root@StormServer001 conf]# firewall-cmd --list-ports
    3888/tcp 2181/tcp 8080/tcp 2888/tcp 6627/tcp
    

    8080 was only needed for the master server, to allow access to the web interface. At this point, I could check the web interface and see that everything was running:

    StormInterface

    I downloaded some sample code from:

    https://github.com/nathanmarz/storm-starter

    and ran one of the samples to confirm that the system was now working. The above shows 15 executions and 15 tasks continuously running from the sample task.

    The last part of the set up process was to create systemd jobs that would run when the servers started. I created scripts similar to the following for all the working (zookeeper, plus storm) scripts:

    [Unit]
    Description=Start monitoring service zookeeper.
    
    [Service]
    Type=simple
    ExecStart=/usr/bin/java -cp /usr/lib/zookeeper/zookeeper-3.4.6.jar:/usr/lib/zookeeper/lib/log4j-1.2.16.jar:/usr/lib/zookeeper/lib/slf4j-api-1.6.1.jar:/usr/lib/zookeeper/lib/slf4j-log4j12-1.6.1.jar:/usr/lib/zookeeper/conf org.apache.zookeeper.server.quorum.QuorumPeerMain /usr/lib/zookeeper/conf/zoo.cfg
    
    [Install]
    WantedBy=multi-user.target
    

    I ran the necessary commands to allow these to start upon boot:

    	systemctl start [filename]
    	systemctl enable [filename]
    

    I created the producer side of the sample code. It was pretty straightforward to get it to work. Here’s a sample of the output:

    876891 [Thread-9-process-json] INFO  backtype.storm.daemon.executor - Processing received message source: read-status:3, stream: default, id: {}, [{"delay":"false","IATA":"STL","state":"Missouri","name":"Lambert-St Louis International","weather":{"visibility":10.00,"weather":"Mostly Cloudy","meta":{"credit":"NOAA's National Weather Service","updated":"9:51 AM Local","url":"http://weather.gov/"},"temp":"79.0 F (26.1 C)","wind":"South at 11.5mph"},"ICAO":"KSTL","city":"St Louis","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    876892 [Thread-9-process-json] INFO  backtype.storm.daemon.task - Emitting: process-json default [edu.hu.faa.storm.pojos.AirportStatus@9067678]
    878091 [Thread-11-read-status] INFO  backtype.storm.daemon.task - Emitting: read-status default [{"delay":"false","IATA":"DFW","state":"Texas","name":"Dallas/Ft Worth International","weather":{"visibility":7.00,"weather":"Light Rain","meta":{"credit":"NOAA's National Weather Service","updated":"9:53 AM Local","url":"http://weather.gov/"},"temp":"69.0 F (20.6 C)","wind":"East at 9.2mph"},"ICAO":"KDFW","city":"Dallas-Ft Worth","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    878092 [Thread-9-process-json] INFO  backtype.storm.daemon.executor - Processing received message source: read-status:3, stream: default, id: {}, [{"delay":"false","IATA":"DFW","state":"Texas","name":"Dallas/Ft Worth International","weather":{"visibility":7.00,"weather":"Light Rain","meta":{"credit":"NOAA's National Weather Service","updated":"9:53 AM Local","url":"http://weather.gov/"},"temp":"69.0 F (20.6 C)","wind":"East at 9.2mph"},"ICAO":"KDFW","city":"Dallas-Ft Worth","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    878092 [Thread-9-process-json] INFO  backtype.storm.daemon.task - Emitting: process-json default [edu.hu.faa.storm.pojos.AirportStatus@a6629c6]
    879322 [Thread-11-read-status] INFO  backtype.storm.daemon.task - Emitting: read-status default [{"delay":"false","IATA":"DEN","state":"Colorado","name":"Denver International","weather":{"visibility":10.00,"weather":"Overcast","meta":{"credit":"NOAA's National Weather Service","updated":"8:53 AM Local","url":"http://weather.gov/"},"temp":"45.0 F (7.2 C)","wind":"North at 3.5mph"},"ICAO":"KDEN","city":"Denver","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    879322 [Thread-11-read-status] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
    879322 [Thread-9-process-json] INFO  backtype.storm.daemon.executor - Processing received message source: read-status:3, stream: default, id: {}, [{"delay":"false","IATA":"DEN","state":"Colorado","name":"Denver International","weather":{"visibility":10.00,"weather":"Overcast","meta":{"credit":"NOAA's National Weather Service","updated":"8:53 AM Local","url":"http://weather.gov/"},"temp":"45.0 F (7.2 C)","wind":"North at 3.5mph"},"ICAO":"KDEN","city":"Denver","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    879323 [Thread-9-process-json] INFO  backtype.storm.daemon.task - Emitting: process-json default [edu.hu.faa.storm.pojos.AirportStatus@6b3e99f1]
    880728 [Thread-11-read-status] INFO  backtype.storm.daemon.task - Emitting: read-status default [{"delay":"false","IATA":"SEA","state":"Washington","name":"Seattle-Tacoma International","weather":{"visibility":10.00,"weather":"A Few Clouds","meta":{"credit":"NOAA's National Weather Service","updated":"7:53 AM Local","url":"http://weather.gov/"},"temp":"50.0 F (10.0 C)","wind":"North at 0.0mph"},"ICAO":"KSEA","city":"Seattle","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    880729 [Thread-9-process-json] INFO  backtype.storm.daemon.executor - Processing received message source: read-status:3, stream: default, id: {}, [{"delay":"false","IATA":"SEA","state":"Washington","name":"Seattle-Tacoma International","weather":{"visibility":10.00,"weather":"A Few Clouds","meta":{"credit":"NOAA's National Weather Service","updated":"7:53 AM Local","url":"http://weather.gov/"},"temp":"50.0 F (10.0 C)","wind":"North at 0.0mph"},"ICAO":"KSEA","city":"Seattle","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    880729 [Thread-9-process-json] INFO  backtype.storm.daemon.task - Emitting: process-json default [edu.hu.faa.storm.pojos.AirportStatus@731ca39]
    

    I then added the consumer and produced the counts of times airport delays were found:

    54584 [Thread-11-process-json] INFO  backtype.storm.daemon.executor - Processing received message source: read-status:4, stream: default, id: {}, [{"delay":"true","IATA":"ORD","state":"Illinois","name":"Chicago OHare International","weather":{"visibility":0.50,"weather":"Fog","meta":{"credit":"NOAA's National Weather Service","updated":"8:51 PM Local","url":"http://weather.gov/"},"temp":"47.0 F (8.3 C)","wind":"Northeast at 8.1mph"},"ICAO":"KORD","city":"Chicago","status":{"reason":"VOL:Multi-taxi","closureBegin":"","endTime":"","minDelay":"16 minutes","avgDelay":"","maxDelay":"30 minutes","closureEnd":"","trend":"Increasing","type":"Departure"}} ]
    54584 [Thread-11-process-json] INFO  backtype.storm.daemon.task - Emitting: process-json default [ORD]
    54585 [Thread-9-delays] INFO  backtype.storm.daemon.executor - Processing received message source: process-json:3, stream: default, id: {}, [ORD]
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - ORD
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++ORD: 4+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++LAX: 0+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++MIA: 0+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++DFW: 0+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++BWI: 0+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++BOS: 0+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++ATL: 0+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++STL: 0+++
    54585 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++CLT: 0+++
    54586 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++DEN: 0+++
    54586 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++SFO: 0+++
    54586 [Thread-9-delays] INFO  edu.hu.faa.storm.bolt.DelayCountBolt - +++SEA: 0+++
    55760 [Thread-13-read-status] INFO  backtype.storm.daemon.task - Emitting: read-status default [{"delay":"false","IATA":"ATL","state":"Georgia","name":"The William B Hartsfield International","weather":{"visibility":10.00,"weather":"A Few Clouds","meta":{"credit":"NOAA's National Weather Service","updated":"10:52 PM Local","url":"http://weather.gov/"},"temp":"78.0 F (25.6 C)","wind":"North at 0.0mph"},"ICAO":"KATL","city":"Atlanta","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}} ]
    

    Evaluation: Apache Spark

    These are my notes from a presentation done for class. A link to sample code can be found near the end.

    I used the instructions from Learning Spark (see references) to load a Standalone Cluster Manager with 3 nodes. I loaded oracles java 1.8 on each machine and set the JAVA_HOME variable, both from the command line and in the .bash_profile file. I copied the CDH4 precompiled version to each machine. I modified the /etc/hosts file to include information on each machine:

    127.0.0.1       localhost       localhost.localdomain
     172.16.3.153    sparkserver001
     172.16.3.185    sparkserver002
     172.16.3.187    sparkserver003

    I set up ssh ‘authorized_hosts’ between the master and the two workers.

    The CDH4 version, on the clients, complained about not finding native Hadoop libraries, so I tried compiling the generic version of Spark. Saw the POM file and assumed Maven was to be used. Forgot about the ‘sbt’ discussed in the Section in class. Maven ran forever and never seemed to finish compilation.  Rerunning compilation with ‘sbt’.

    Found an easier way to install a cluster in Learning Spark (see references). Following the instructions in the Running a Cluster chapter, I did the following:

    1. Set up a ‘hadoop’ user through which to run everything
    2. Generated ssh keys for each server. Copied the keys across servers and added them to the ‘authorized_keys’ file.
    3. Edited the conf/slaves file on the master node and added the URLs for all the nodes to it:
    [root@SparkServer001 ~]# cat /opt/spark/conf/slaves
     # A Spark Worker will be started on each of the machines listed below.
     localhost
     sparkserver002
     sparkserver003

    I tested the setup by running a modified version of the samples script from  Learning Java. It was modified to remove the extra libraries that were used, in order to simplify this process a bit. Here’s the modified script I used:

    #!/usr/bin/env bash
    # This script is used to run all of the examples. Mostly to be used by travis for testing
    # Output the commands we run
    set -x
    # If any command fails, fail
    set -e
    # Build everything
    KAFKA_ROOT=./kafka_2.9.2-0.8.1.1
    SPARK_HOME=/opt/spark
    SPARK_SUBMIT_SCRIPT="$SPARK_HOME/bin/spark-submit"
    ASSEMBLY_JAR=./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar
    # Mini cleanup - note: this fails the script if the directories don't exist. Just manually erase the ones that do
    #hadoop fs -rm -R /tmp/py; hadoop fs -mkdir /tmp/py
    #hadoop fs -rm -R /tmp/java; hadoop fs -mkdir /tmp/java
    #hadoop fs -rm -R /tmp/scala; hadoop fs -mkdir /tmp/scala
    
    # Scala
    echo "Running Scala programs"
    $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.LoadJsonWithSparkSQL $ASSEMBLY_JAR local ./files/pandainfo.json
    TWITTER_DATA=./files/testweet.json
    $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.SparkSQLTwitter  $ASSEMBLY_JAR  "$TWITTER_DATA"  /tmp/scala/tweetout
    echo "Running Scala streaming program"
    ./bin/fakelogs.sh &
    sleep 1
    $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.scala.StreamingLogInput $ASSEMBLY_JAR local[4]
    
    # Python
    echo "Running Python programs"
    
    $SPARK_SUBMIT_SCRIPT ./src/python/AvgMapPartitions.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/BasicAvg.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/BasicFilterMap.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/BasicKeyValueMapFilter.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/BasicMapPartitions.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/BasicMap.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/SparkSQLTwitter.py ./files/testweet.json /tmp/py/tweetout
    $SPARK_SUBMIT_SCRIPT ./src/python/LoadCsv.py local ./files/favourite_animals.csv /tmp/py/panda_lovers.csv
    $SPARK_SUBMIT_SCRIPT ./src/python/MakeHiveTable.py local ./files/int_string.csv pandaplural
    $SPARK_SUBMIT_SCRIPT ./src/python/LoadJson.py local ./files/pandainfo.json /tmp/py/loadjsonout
    $SPARK_SUBMIT_SCRIPT ./src/python/PerKeyAvg.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/RemoveOutliers.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/WordCount.py local
    $SPARK_SUBMIT_SCRIPT ./src/python/MakeParquetFile.py local ./files/favourite_animals.csv /tmp/py/favouriteanimal_parquet
    $SPARK_SUBMIT_SCRIPT ./src/python/QueryParquetFile.py local /tmp/py/favouriteanimal_parquet
    # Java
    echo "Running Java programs"
    $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.LoadJsonWithSparkSQL $ASSEMBLY_JAR local ./files/pandainfo.json
    ./sbt/sbt assembly && $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.SparkSQLTwitter  $ASSEMBLY_JAR  ./files/testweet.json  /tmp/java/tweetout
    
    echo "Running Java streaming program"
    ./bin/fakelogs.sh &
    sleep 1
    $SPARK_SUBMIT_SCRIPT --class com.oreilly.learningsparkexamples.java.StreamingLogInput $ASSEMBLY_JAR local[4]
    sleep 5
    
    echo "Done running all programs :)"
    

    This ran as expected (truncated, since the output is quite long. Note, however, that the script halts if there are any problems along the way):

    + set -e
    + KAFKA_ROOT=./kafka_2.9.2-0.8.1.1
    + SPARK_HOME=/opt/spark
    + SPARK_SUBMIT_SCRIPT=/opt/spark/bin/spark-submit
    + ASSEMBLY_JAR=./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar
    + echo 'Running Scala programs'
    Running Scala programs
    + /opt/spark/bin/spark-submit --class com.oreilly.learningsparkexamples.scala.LoadJsonWithSparkSQL ./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar local ./files/pandainfo.json
    
    ………..
    
    + echo 'Done running all programs :)'
    Done running all programs :)
    [hadoop@SparkServer001 learning-spark-examples]$ hadoop fs -ls /tmp
    Found 2 items
    drwxr-xr-x   - hadoop supergroup      	0 2015-05-10 10:05 /tmp/hive-hadoop
    drwxr-xr-x   - hadoop supergroup      	0 2015-05-10 10:06 /tmp/py
    

    I tried to use Yarn with Spark, but I kept running into errors that I didn’t have time to debug. For example:

    [hadoop@SparkServer001 learning-spark-examples]$ /opt/spark/bin/spark-submit --class com.oreilly.learningsparkexamples.java.LoadJsonWithSparkSQL ./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar yarn-client files/pandainfo.json
    15/05/10 10:32:28 INFO spark.SparkContext: Running Spark version 1.3.1
    15/05/10 10:32:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    15/05/10 10:32:28 INFO spark.SecurityManager: Changing view acls to: hadoop
    15/05/10 10:32:28 INFO spark.SecurityManager: Changing modify acls to: hadoop
    15/05/10 10:32:28 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
    15/05/10 10:32:29 INFO slf4j.Slf4jLogger: Slf4jLogger started
    15/05/10 10:32:29 INFO Remoting: Starting remoting
    15/05/10 10:32:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@sparkserver001:52966]
    15/05/10 10:32:29 INFO util.Utils: Successfully started service 'sparkDriver' on port 52966.
    15/05/10 10:32:29 INFO spark.SparkEnv: Registering MapOutputTracker
    15/05/10 10:32:29 INFO spark.SparkEnv: Registering BlockManagerMaster
    15/05/10 10:32:29 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-ee96c812-731e-476b-99e4-286ef69f6869/blockmgr-c1b224fa-8426-4ac6-9075-9e924d0edee0
    15/05/10 10:32:29 INFO storage.MemoryStore: MemoryStore started with capacity 265.1 MB
    15/05/10 10:32:30 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-6124584e-0e03-44ee-8b37-fd9f034b871b/httpd-bec65866-4f72-4291-b35d-edbd818f01b3
    15/05/10 10:32:30 INFO spark.HttpServer: Starting HTTP Server
    15/05/10 10:32:30 INFO server.Server: jetty-8.y.z-SNAPSHOT
    15/05/10 10:32:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:51934
    15/05/10 10:32:30 INFO util.Utils: Successfully started service 'HTTP file server' on port 51934.
    15/05/10 10:32:30 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    15/05/10 10:32:30 INFO server.Server: jetty-8.y.z-SNAPSHOT
    15/05/10 10:32:30 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    15/05/10 10:32:30 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    15/05/10 10:32:30 INFO ui.SparkUI: Started SparkUI at http://sparkserver001:4040
    15/05/10 10:32:30 INFO spark.SparkContext: Added JAR file:/home/hadoop/learning-spark-examples/./target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar at http://172.16.3.153:51934/jars/learning-spark-examples-assembly-0.0.1.jar with timestamp 1431279150702
    15/05/10 10:32:30 INFO client.RMProxy: Connecting to ResourceManager at SparkServer001/172.16.3.153:8032
    15/05/10 10:32:31 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
    15/05/10 10:32:31 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
    15/05/10 10:32:31 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    15/05/10 10:32:31 INFO yarn.Client: Setting up container launch context for our AM
    15/05/10 10:32:31 INFO yarn.Client: Preparing resources for our AM container
    15/05/10 10:32:31 INFO yarn.Client: Uploading resource file:/opt/spark-1.3.1-bin-hadoop2.6/lib/spark-assembly-1.3.1-hadoop2.6.0.jar -> hdfs://sparkserver001:9000/user/hadoop/.sparkStaging/application_1430856908764_0013/spark-assembly-1.3.1-hadoop2.6.0.jar
    15/05/10 10:32:32 INFO yarn.Client: Setting up the launch environment for our AM container
    15/05/10 10:32:32 INFO spark.SecurityManager: Changing view acls to: hadoop
    15/05/10 10:32:32 INFO spark.SecurityManager: Changing modify acls to: hadoop
    15/05/10 10:32:32 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
    15/05/10 10:32:32 INFO yarn.Client: Submitting application 13 to ResourceManager
    15/05/10 10:32:32 INFO impl.YarnClientImpl: Submitted application application_1430856908764_0013
    15/05/10 10:32:33 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
    15/05/10 10:32:33 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: default
         start time: 1431279152748
         final status: UNDEFINED
         tracking URL: http://sparkserver001:8088/proxy/application_1430856908764_0013/
         user: hadoop
    15/05/10 10:32:34 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
    15/05/10 10:32:35 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
    15/05/10 10:32:36 INFO yarn.Client: Application report for application_1430856908764_0013 (state: ACCEPTED)
    15/05/10 10:32:37 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@sparkserver001:42980/user/YarnAM#-1479048841]
    15/05/10 10:32:37 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> SparkServer001, PROXY_URI_BASES -> http://SparkServer001:8088/proxy/application_1430856908764_0013), /proxy/application_1430856908764_0013
    15/05/10 10:32:37 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    15/05/10 10:32:37 INFO yarn.Client: Application report for application_1430856908764_0013 (state: RUNNING)
    15/05/10 10:32:37 INFO yarn.Client:
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: sparkserver001
         ApplicationMaster RPC port: 0
         queue: default
         start time: 1431279152748
         final status: UNDEFINED
         tracking URL: http://sparkserver001:8088/proxy/application_1430856908764_0013/
         user: hadoop
    15/05/10 10:32:37 INFO cluster.YarnClientSchedulerBackend: Application application_1430856908764_0013 has started running.
    15/05/10 10:32:37 INFO netty.NettyBlockTransferService: Server created on 52776
    15/05/10 10:32:37 INFO storage.BlockManagerMaster: Trying to register BlockManager
    15/05/10 10:32:37 INFO storage.BlockManagerMasterActor: Registering block manager sparkserver001:52776 with 265.1 MB RAM, BlockManagerId(, sparkserver001, 52776)
    15/05/10 10:32:37 INFO storage.BlockManagerMaster: Registered BlockManager
    15/05/10 10:32:39 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@sparkserver001:42980] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
    15/05/10 10:32:42 INFO cluster.YarnClientSchedulerBackend: ApplicationMaster registered as Actor[akka.tcp://sparkYarnAM@sparkserver001:48603/user/YarnAM#-413459596]
    15/05/10 10:32:42 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> SparkServer001, PROXY_URI_BASES -> http://SparkServer001:8088/proxy/application_1430856908764_0013), /proxy/application_1430856908764_0013
    15/05/10 10:32:42 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    15/05/10 10:32:45 WARN remote.ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkYarnAM@sparkserver001:48603] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
    15/05/10 10:32:45 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    15/05/10 10:32:45 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    15/05/10 10:32:45 INFO ui.SparkUI: Stopped Spark web UI at http://sparkserver001:4040
    15/05/10 10:32:45 INFO scheduler.DAGScheduler: Stopping DAGScheduler
    15/05/10 10:32:45 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    15/05/10 10:32:45 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
    15/05/10 10:32:45 INFO cluster.YarnClientSchedulerBackend: Stopped
    15/05/10 10:32:45 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
    15/05/10 10:32:46 INFO storage.MemoryStore: MemoryStore cleared
    15/05/10 10:32:46 INFO storage.BlockManager: BlockManager stopped
    15/05/10 10:32:46 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    15/05/10 10:32:46 INFO spark.SparkContext: Successfully stopped SparkContext
    15/05/10 10:32:46 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped!
    15/05/10 10:32:46 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    15/05/10 10:32:46 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
    15/05/10 10:32:46 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
    15/05/10 10:33:00 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
    Exception in thread "main" java.lang.NullPointerException
        at org.apache.spark.SparkContext.(SparkContext.scala:544)
        at org.apache.spark.SparkContext.(SparkContext.scala:154)
        at org.apache.spark.SparkContext.(SparkContext.scala:169)
        at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:67)
        at com.oreilly.learningsparkexamples.java.LoadJsonWithSparkSQL.main(LoadJsonWithSparkSQL.java:34)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    [hadoop@SparkServer001 learning-spark-examples]$
    

    Found that this issue was due to problems in the config file on one of the nodes. Fixed that and got passed that error.

    Wrote the sample code that matches the purpose for each of the other technologies. But this version doesn’t like the URLs I’m using:

    java.net.UnknownHostException: http://services.faa.gov/airport/status/BWI?format=application/json
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at java.net.Socket.connect(Socket.java:538)
        at java.net.Socket.(Socket.java:434)
        at java.net.Socket.(Socket.java:211)
        at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:71)
        at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:57)
    15/05/10 12:52:06 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiv
    

    I wrote a quick python server script that would actually read the incoming data and pass it on a local port, based on something I found here: http://www.binarytides.com/python-socket-server-code-example

    My version:

    '''
    	Simple socket server using threads
    '''
     
    import socket
    import sys
    import urllib2
    from thread import *
     
    HOST = ''   # Symbolic name meaning all available interfaces
    PORT = 8854 # Arbitrary non-privileged port
    
    count = 0;
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print 'Socket created'
     
    #Bind socket to local host and port
    try:
    	s.bind((HOST, PORT))
    except socket.error as msg:
    	print 'Bind failed. Error Code : ' + str(msg[0]) + ' Message ' + msg[1]
    	sys.exit()
    	 
    print 'Socket bind complete'
     
    #Start listening on socket
    s.listen(10)
    print 'Socket now listening'
     
    #Function for handling connections. This will be used to create threads
    def clientthread(conn):
        global count
        airportCodes = ['SFO','BWI','MIA','ORD','ATL','LAX','BOS','STL','DFW','DEN','SEA','CLT','JFK']
        	 
    	#Get next string
        nextString = "http://services.faa.gov/airport/status/" + airportCodes[count] + "?format=application/json"
        print nextString
        response = urllib2.urlopen(nextString)
        reply = response.read()
    	 
        conn.sendall(reply)
        count += 1
        if count == len(airportCodes):
       	 count = 0
    	 
    	#came out of loop
        conn.close()
     
    #now keep talking with the client
    while 1:
    	#wait to accept a connection - blocking call
    	conn, addr = s.accept()
    	print 'Connected with ' + addr[0] + ':' + str(addr[1])
    	 
    	#start new thread takes 1st argument as a function name to be run, second is the tuple of arguments to the function.
    	start_new_thread(clientthread ,(conn,))
     
    s.close()
    

    That seemed to do the trick. I’m now ingesting data:

    Server side (sample) –

    http://services.faa.gov/airport/status/DFW?format=application/json
    Connected with 127.0.0.1:33389
    http://services.faa.gov/airport/status/DEN?format=application/json
    Connected with 127.0.0.1:33391
    http://services.faa.gov/airport/status/SEA?format=application/json
    Connected with 127.0.0.1:33393
    http://services.faa.gov/airport/status/CLT?format=application/json
    Connected with 127.0.0.1:33395
    http://services.faa.gov/airport/status/JFK?format=application/json
    Connected with 127.0.0.1:33397
    http://services.faa.gov/airport/status/SFO?format=application/json
    Connected with 127.0.0.1:33399
    http://services.faa.gov/airport/status/BWI?format=application/json
    Connected with 127.0.0.1:33402
    http://services.faa.gov/airport/status/MIA?format=application/json
    Connected with 127.0.0.1:33404
    http://services.faa.gov/airport/status/ORD?format=application/json
    Connected with 127.0.0.1:33406
    http://services.faa.gov/airport/status/ATL?format=application/json
    Connected with 127.0.0.1:33408
    http://services.faa.gov/airport/status/LAX?format=application/json
    Connected with 127.0.0.1:33410
    http://services.faa.gov/airport/status/BOS?format=application/json
    Connected with 127.0.0.1:33412
    http://services.faa.gov/airport/status/STL?format=application/json
    Connected with 127.0.0.1:33414
    http://services.faa.gov/airport/status/DFW?format=application/json
    Connected with 127.0.0.1:33416
    http://services.faa.gov/airport/status/DEN?format=application/json
    Connected with 127.0.0.1:33418
    http://services.faa.gov/airport/status/SEA?format=application/json
    Connected with 127.0.0.1:33420
    http://services.faa.gov/airport/status/CLT?format=application/json
    Connected with 127.0.0.1:33422
    http://services.faa.gov/airport/status/JFK?format=application/json
    Connected with 127.0.0.1:33424
    http://services.faa.gov/airport/status/SFO?format=application/json
    Connected with 127.0.0.1:33426
    http://services.faa.gov/airport/status/BWI?format=application/json
    Connected with 127.0.0.1:33428
    http://services.faa.gov/airport/status/MIA?format=application/json
    Connected with 127.0.0.1:33430
    http://services.faa.gov/airport/status/ORD?format=application/json
    Connected with 127.0.0.1:33432
    http://services.faa.gov/airport/status/ATL?format=application/json
    Connected with 127.0.0.1:33434
    http://services.faa.gov/airport/status/LAX?format=application/json
    Connected with 127.0.0.1:33436
    http://services.faa.gov/airport/status/BOS?format=application/json
    Connected with 127.0.0.1:33438
    http://services.faa.gov/airport/status/STL?format=application/json
    Connected with 127.0.0.1:33440
    http://services.faa.gov/airport/status/DFW?format=application/json
    Connected with 127.0.0.1:33442
    http://services.faa.gov/airport/status/DEN?format=application/json
    Connected with 127.0.0.1:33444
    http://services.faa.gov/airport/status/SEA?format=application/json
    Connected with 127.0.0.1:33446
    http://services.faa.gov/airport/status/CLT?format=application/json
    Connected with 127.0.0.1:33448
    http://services.faa.gov/airport/status/JFK?format=application/json
    Connected with 127.0.0.1:33450
    http://services.faa.gov/airport/status/SFO?format=application/json
    Connected with 127.0.0.1:33452
    http://services.faa.gov/airport/status/BWI?format=application/json
    Connected with 127.0.0.1:33454
    http://services.faa.gov/airport/status/MIA?format=application/json
    Connected with 127.0.0.1:33457
    http://services.faa.gov/airport/status/ORD?format=application/json
    Connected with 127.0.0.1:33459
    http://services.faa.gov/airport/status/ATL?format=application/json
    Connected with 127.0.0.1:33461
    http://services.faa.gov/airport/status/LAX?format=application/json
    Connected with 127.0.0.1:33463
    http://services.faa.gov/airport/status/BOS?format=application/json
    Connected with 127.0.0.1:33465
    http://services.faa.gov/airport/status/STL?format=application/json
    Connected with 127.0.0.1:33467
    http://services.faa.gov/airport/status/DFW?format=application/json
    Connected with 127.0.0.1:33469
    http://services.faa.gov/airport/status/DEN?format=application/json
    Connected with 127.0.0.1:33471
    http://services.faa.gov/airport/status/SEA?format=application/json
    Connected with 127.0.0.1:33473
    http://services.faa.gov/airport/status/CLT?format=application/json
    

    Client side (sample):

    15/05/10 14:23:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 39.0 (TID 39) in 3 ms on localhost (1/1)
    15/05/10 14:23:09 INFO scheduler.DAGScheduler: Stage 39 (print at FaaFlightDelayStream.java:125) finished in 0.003 s
    15/05/10 14:23:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 39.0, whose tasks have all completed, from pool
    15/05/10 14:23:09 INFO scheduler.DAGScheduler: Job 39 finished: print at FaaFlightDelayStream.java:125, took 0.009611 s
    -------------------------------------------
    Time: 1431292989000 ms
    -------------------------------------------
    edu.hu.faa.spark.AirportStatus@3e6c76b2
    
    15/05/10 14:23:09 INFO scheduler.JobScheduler: Finished job streaming job 1431292989000 ms.0 from job set of time 1431292989000 ms
    15/05/10 14:23:09 INFO scheduler.JobScheduler: Total delay: 0.015 s for time 1431292989000 ms (execution: 0.011 s)
    15/05/10 14:23:09 INFO rdd.MapPartitionsRDD: Removing RDD 190 from persistence list
    15/05/10 14:23:09 INFO storage.BlockManager: Removing RDD 190
    15/05/10 14:23:09 INFO rdd.BlockRDD: Removing RDD 189 from persistence list
    15/05/10 14:23:09 INFO storage.BlockManager: Removing RDD 189
    15/05/10 14:23:09 INFO dstream.SocketInputDStream: Removing blocks of RDD BlockRDD[189] at socketTextStream at FaaFlightDelayStream.java:67 of time 1431292989000 ms
    15/05/10 14:23:09 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1431292987000 ms)
    -------------------------------------------15/05/10 14:23:10 INFO scheduler.JobScheduler: Starting job streaming job 1431292990000 ms.0 from job set of time 1431292990000 ms
    
    Time: 1431292990000 ms
    -------------------------------------------
    15/05/10 14:23:10 INFO scheduler.JobScheduler: Added jobs for time 1431292990000 ms
    
    15/05/10 14:23:10 INFO scheduler.JobScheduler: Finished job streaming job 1431292990000 ms.0 from job set of time 1431292990000 ms
    15/05/10 14:23:10 INFO scheduler.JobScheduler: Total delay: 0.004 s for time 1431292990000 ms (execution: 0.000 s)
    15/05/10 14:23:10 INFO rdd.MapPartitionsRDD: Removing RDD 192 from persistence list
    15/05/10 14:23:10 INFO storage.BlockManager: Removing RDD 192
    15/05/10 14:23:10 INFO rdd.BlockRDD: Removing RDD 191 from persistence list
    15/05/10 14:23:10 INFO storage.BlockManager: Removing RDD 191
    15/05/10 14:23:10 INFO dstream.SocketInputDStream: Removing blocks of RDD BlockRDD[191] at socketTextStream at FaaFlightDelayStream.java:67 of time 1431292990000 ms
    15/05/10 14:23:10 INFO storage.BlockManager: Removing block input-0-1431292988000
    15/05/10 14:23:10 INFO storage.MemoryStore: Block input-0-1431292988000 of size 510 dropped from memory (free 277876600)
    15/05/10 14:23:10 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1431292988000 ms)
    15/05/10 14:23:10 INFO storage.BlockManagerInfo: Removed input-0-1431292988000 on localhost:53643 in memory (size: 510.0 B, free: 265.1 MB)
    15/05/10 14:23:10 INFO storage.BlockManagerMaster: Updated info of block input-0-1431292988000
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Starting receiver again
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Starting receiver
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart
    15/05/10 14:23:10 INFO dstream.SocketReceiver: Connecting to localhost:8854
    15/05/10 14:23:10 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from akka://sparkDriver
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Receiver started again
    15/05/10 14:23:10 INFO dstream.SocketReceiver: Connected to localhost:8854
    15/05/10 14:23:10 INFO dstream.SocketReceiver: Stopped receiving
    15/05/10 14:23:10 INFO dstream.SocketReceiver: Closed socket to localhost:8854
    15/05/10 14:23:10 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Retrying connecting to localhost:8854
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Retrying connecting to localhost:8854:
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0
    15/05/10 14:23:10 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Retrying connecting to localhost:8854
    15/05/10 14:23:10 INFO receiver.ReceiverSupervisorImpl: Stopped receiver 0
    15/05/10 14:23:10 INFO storage.MemoryStore: ensureFreeSpace(517) called with curMem=142840, maxMem=278019440
    15/05/10 14:23:10 INFO storage.MemoryStore: Block input-0-1431292990400 stored as bytes in memory (estimated size 517.0 B, free 265.0 MB)
    15/05/10 14:23:10 INFO storage.BlockManagerInfo: Added input-0-1431292990400 in memory on localhost:53643 (size: 517.0 B, free: 265.1 MB)
    15/05/10 14:23:10 INFO storage.BlockManagerMaster: Updated info of block input-0-1431292990400
    15/05/10 14:23:10 INFO receiver.BlockGenerator: Pushed block input-0-1431292990400
    15/05/10 14:23:11 INFO scheduler.JobScheduler: Added jobs for time 1431292991000 ms
    15/05/10 14:23:11 INFO scheduler.JobScheduler: Starting job streaming job 1431292991000 ms.0 from job set of time 1431292991000 ms
    15/05/10 14:23:11 INFO spark.SparkContext: Starting job: print at FaaFlightDelayStream.java:125
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Got job 40 (print at FaaFlightDelayStream.java:125) with 1 output partitions (allowLocal=true)
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Final stage: Stage 40(print at FaaFlightDelayStream.java:125)
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Parents of final stage: List()
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Missing parents: List()
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Submitting Stage 40 (MapPartitionsRDD[196] at map at FaaFlightDelayStream.java:71), which has no missing parents
    15/05/10 14:23:11 INFO storage.MemoryStore: ensureFreeSpace(2024) called with curMem=143357, maxMem=278019440
    15/05/10 14:23:11 INFO storage.MemoryStore: Block broadcast_40 stored as values in memory (estimated size 2024.0 B, free 265.0 MB)
    15/05/10 14:23:11 INFO storage.MemoryStore: ensureFreeSpace(1472) called with curMem=145381, maxMem=278019440
    15/05/10 14:23:11 INFO storage.MemoryStore: Block broadcast_40_piece0 stored as bytes in memory (estimated size 1472.0 B, free 265.0 MB)
    15/05/10 14:23:11 INFO storage.BlockManagerInfo: Added broadcast_40_piece0 in memory on localhost:53643 (size: 1472.0 B, free: 265.1 MB)
    15/05/10 14:23:11 INFO storage.BlockManagerMaster: Updated info of block broadcast_40_piece0
    15/05/10 14:23:11 INFO spark.SparkContext: Created broadcast 40 from broadcast at DAGScheduler.scala:839
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 40 (MapPartitionsRDD[196] at map at FaaFlightDelayStream.java:71)
    15/05/10 14:23:11 INFO scheduler.TaskSchedulerImpl: Adding task set 40.0 with 1 tasks
    15/05/10 14:23:11 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 40.0 (TID 40, localhost, NODE_LOCAL, 1307 bytes)
    15/05/10 14:23:11 INFO executor.Executor: Running task 0.0 in stage 40.0 (TID 40)
    15/05/10 14:23:11 INFO storage.BlockManager: Found block input-0-1431292990400 locally
    15/05/10 14:23:11 INFO spark.FaaFlightDelayStream: Record: {"delay":"false","IATA":"CLT","state":"North Carolina","name":"Charlotte Douglas International","weather":{"visibility":10.00,"weather":"Mostly Cloudy","meta":{"credit":"NOAA's National Weather Service","updated":"4:52 PM Local","url":"http://weather.gov/"},"temp":"79.0 F (26.1 C)","wind":"North at 12.7mph"},"ICAO":"KCLT","city":"Charlotte","status":{"reason":"No known delays for this airport.","closureBegin":"","endTime":"","minDelay":"","avgDelay":"","maxDelay":"","closureEnd":"","trend":"","type":""}}
    15/05/10 14:23:11 INFO executor.Executor: Finished task 0.0 in stage 40.0 (TID 40). 937 bytes result sent to driver
    15/05/10 14:23:11 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 40.0 (TID 40) in 4 ms on localhost (1/1)
    15/05/10 14:23:11 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 40.0, whose tasks have all completed, from pool
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Stage 40 (print at FaaFlightDelayStream.java:125) finished in 0.004 s
    15/05/10 14:23:11 INFO scheduler.DAGScheduler: Job 40 finished: print at FaaFlightDelayStream.java:125, took 0.011392 s
    -------------------------------------------
    Time: 1431292991000 ms
    -------------------------------------------
    edu.hu.faa.spark.AirportStatus@6f857bb2
    
    15/05/10 14:23:11 INFO scheduler.JobScheduler: Finished job streaming job 1431292991000 ms.0 from job set of time 1431292991000 ms
    15/05/10 14:23:11 INFO scheduler.JobScheduler: Total delay: 0.016 s for time 1431292991000 ms (execution: 0.012 s)
    15/05/10 14:23:11 INFO rdd.MapPartitionsRDD: Removing RDD 194 from persistence list
    15/05/10 14:23:11 INFO storage.BlockManager: Removing RDD 194
    15/05/10 14:23:11 INFO rdd.BlockRDD: Removing RDD 193 from persistence list
    15/05/10 14:23:11 INFO storage.BlockManager: Removing RDD 193
    15/05/10 14:23:11 INFO dstream.SocketInputDStream: Removing blocks of RDD BlockRDD[193] at socketTextStream at FaaFlightDelayStream.java:67 of time 1431292991000 ms
    15/05/10 14:23:11 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer(1431292989000 ms)
    

    Added a client portion that would read in the data and tally how many times the airport reported delays over an hour’s period. Not that I’d want to keep this running for an hour. But I did want to keep the count active. Here’s the output for that portion of the console log:

    -------------------------------------------
    Time: 1431301008000 ms
    -------------------------------------------
    (ORD,8)
    (ATL,0)
    (STL,0)
    (CLT,0)
    (LAX,0)
    (SFO,0)
    (BWI,0)
    (DEN,0)
    (DFW,0)
    (MIA,0)
    ….
    

    The checkpoint subdirectory I created appears thus:

    hadoop@SparkServer001 FaaSpark]$ hadoop fs -ls /checkpoint
    Found 12 items
    drwxr-xr-x   - hadoop supergroup      	0 2015-05-10 16:36 /checkpoint/10f1ba86-b3c2-49cd-8432-5b0fc338eb08
    -rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301003000
    -rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301004000
    -rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301004000.bk
    -rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301005000
    -rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301005000.bk
    -rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301006000
    -rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301006000.bk
    -rw-r--r--   3 hadoop supergroup   	8054 2015-05-10 16:36 /checkpoint/checkpoint-1431301007000
    -rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301007000.bk
    -rw-r--r--   3 hadoop supergroup   	8060 2015-05-10 16:36 /checkpoint/checkpoint-1431301008000
    -rw-r--r--   3 hadoop supergroup      	0 2015-05-10 16:36 /checkpoint/temp
    

    The content of the files is binary and I don’t remember the flag off the top of my head to convert those to text for reading.

    The code I wrote can be found here:

    https://gitlab.com/amracel/FaaSpark

    Creating a CXF Client Application

    I’m creating a client that accesses our in-house SOAP web services. Our code base uses Spring, so I need to incorporate that. The SOAP service has security, so I need to get that working. We use maven as our build tool, so I need to get that in here as well. Here’s the steps I’m following to get it to work. I’ll edit these as I go.

    1. Download and install Apache CXF.
    2. Install the security certificate on your system using Install Cert. If that code ever goes missing, we’re all in trouble!
      • compile the code
      • run the instructions as written
      • move the jsscacert file to your /jre/lib/security folder for the JRE you’re using for your system
    3. Create a generic maven jar project (#5) using maven arechetype:generate.
    4. Add the CXF maven entries to your pom.xml.
    5. Run WSDL2JAVAto generate your base code.
    6. Create a password callback class
      import java.io.IOException;
      
      import javax.security.auth.callback.Callback;
      import javax.security.auth.callback.CallbackHandler;
      import javax.security.auth.callback.UnsupportedCallbackException;
      
      import org.apache.ws.security.WSPasswordCallback;
      
      public class ClientPasswordCallback implements CallbackHandler {
      
      	private String system;
      	public ClientPasswordCallback() {
      		// TODO Auto-generated constructor stub
      	}
      
      	public void handle(Callback[] callbacks) throws IOException,
      			UnsupportedCallbackException {
      		WSPasswordCallback pc = (WSPasswordCallback) callbacks[0];
      
      		pc.setPassword("My password");
      
      	}
      	public void setSystem(String pVal)
      	{
      		system = pVal;
      	}
    7. Add the security and bean information to your applicationContext.xml file:
      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
             xmlns:jaxws="http://cxf.apache.org/jaxws"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="
      http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans.xsd
      http://cxf.apache.org/jaxws http://cxf.apache.org/schemas/jaxws.xsd">
      <jaxws:client id="[port bean id]"
                        serviceClass="[my port class]"
                        address="[https://mywsdl]">
         <jaxws:outInterceptors>
           <bean class="org.apache.cxf.binding.soap.saaj.SAAJOutInterceptor"/>
           <ref bean="wss4jOutConfiguration"/>
         </jaxws:outInterceptors>
      </jaxws:client>
      
      <bean id="wss4jOutConfiguration" class="org.apache.cxf.ws.security.wss4j.WSS4JOutInterceptor">
        <constructor-arg>
          <map>
             <entry key="action" value="UsernameToken"/>
             <entry key="passwordType" value="PasswordText"/>
             <entry key="user" value="[my login]"/>
             <entry key="passwordCallbackRef">
               <ref bean="passwordCallback"/>
             </entry>
          </map>
        </constructor-arg>
      </bean>
      
      <bean id="passwordCallback" class="[my callback class]">
         <property name="system" value="test"/>
      </bean>
      </beans>
    8. Modify the test case to use your beans:
      ApplicationContext context = new ClassPathXmlApplicationContext(
      		"/applicationContext.xml");
          	port = ([port class]) context.getBean("[beanid]");
      
              ObjectFactory factory = new ObjectFactory();
      
              [request class] myRequest = factory.create[request class]
    9. Delete the [beanname]Service.java class. You don’t need it when you use the Spring stuff.