Hi all,
I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra. Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs. I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. HDP: Spark 2.1 I would like to know you have some suggestion to improve performance (other than getting more resources :) ). My code (pyspark) is posted in the end of this email so you can take a look. Thank you in advance, Best Regards, Saulo =============== # CODE # ================================= #### # run command: # spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 ## # Run Spark imports from pyspark import SparkConf # SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils # Run Cassandra imports import pyspark_cassandra from pyspark_cassandra import CassandraSparkContext, saveToCassandra def recordHandler(record): (mid, tt, in_tt, sid, mv) = parseData( record ) return processMetrics(mid, tt, in_tt, sid, mv) def process(time, rdd): rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) if rdd2.count() > 0: return rdd2 def casssave(time, rdd): rdd.saveToCassandra( "test_hdpkns", "measurement" ) # ... brokers, topic = sys.argv[1:] # ... sconf = SparkConf() \ .setAppName("SensorDataStreamHandler") \ .setMaster("local[*]") \ .set("spark.default.parallelism", "2") sc = CassandraSparkContext(conf = sconf) batchIntervalSeconds = 2 ssc = StreamingContext(sc, batchIntervalSeconds) kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) kafkaStream \ .transform(process) \ .foreachRDD(casssave) ssc.start() ssc.awaitTermination() ================================================