Hi Saulo,

I meant using this to save:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might
compete for resources which will slow down the insert. You can check the
CPU usage of the machine at the time. Also the design of the table schema
can make a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
wrote:

> Hi Javier,
>
>
> I removed the map and used "map" directly instead of using transform, but
> the *kafkaStream* is created with KafkaUtils which does not have a method
> to save to cassandra directly.
>
> Do you know any workarround for this?
>
>
> Thank you for the suggestion.
>
> Best Regards,
>
> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote:
> Hi Saulo,
>
> I'm no expert but I will give it a try.
> I would remove the rdd2.count(), I can't see the point and you will gain
> performance right away. Because of this, I would not use a transform, just
> directly the map.
> I have not used python but in Scala the cassandra-spark connector can save
> directly to Cassandra without a foreachRDD.
>
> Finally I would use the spark UI to find which stage is the bottleneck
> here.
>
> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
> wrote:
>
>> Hi all,
>>
>> I am implementing a use case where I read some sensor data from Kafka
>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>> after some transformations, write the output (RDD) to Cassandra.
>>
>> Everything is working properly but I am having some trouble with the
>> performance. My kafka topic receives around 2000 messages per second. For a
>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>> Cassandra, which is not acceptable for longer runs.
>>
>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores
>> and 30GB SSD space.
>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>
>> I would like to know you have some suggestion to improve performance
>> (other than getting more resources :) ).
>>
>> My code (pyspark) is posted in the end of this email so you can take a
>> look. I tried some different cassandra configurations following this link:
>> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
>> (recommended in stackoverflow for similar questions).
>>
>>
>> Thank you in advance,
>>
>> Best Regards,
>> Saulo
>>
>>
>>
>> =============== # CODE # =================================
>> ####
>> # run command:
>> # spark2-submit --packages
>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>>  --conf spark.cassandra.connection.host='localhost' --num-executors 2
>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
>> ##
>>
>> # Run Spark imports
>> from pyspark import SparkConf # SparkContext, SparkConf
>> from pyspark.streaming import StreamingContext
>> from pyspark.streaming.kafka import KafkaUtils
>>
>> # Run Cassandra imports
>> import pyspark_cassandra
>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>
>> def recordHandler(record):
>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>
>> def process(time, rdd):
>>     rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
>>     if rdd2.count() > 0:
>>         return rdd2
>>
>> def casssave(time, rdd):
>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>
>> # ...
>> brokers, topic = sys.argv[1:]
>>
>> # ...
>>
>> sconf = SparkConf() \
>>         .setAppName("SensorDataStreamHandler") \
>>         .setMaster("local[*]") \
>>         .set("spark.default.parallelism", "2")
>>
>> sc = CassandraSparkContext(conf = sconf)
>> batchIntervalSeconds = 2
>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>
>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>> {"metadata.broker.list": brokers})
>>
>> kafkaStream \
>>     .transform(process) \
>>     .foreachRDD(casssave)
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>> ================================================
>>
>>
>>
>>
>>

Reply via email to