Hi Javier,

I removed the map and used "map" directly instead of using transform, but the 
kafkaStream is created with KafkaUtils which does not have a method to save to 
cassandra directly.

Do you know any workarround for this?


Thank you for the suggestion.

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain 
performance right away. Because of this, I would not use a transform, just 
directly the map.
I have not used python but in Scala the cassandra-spark connector can save 
directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, 
<saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote:

Hi all,

I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I 
tried some different cassandra configurations following this link: 
http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
 (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply via email to