The main language they developed spark with is scala, so all the new
features go first to scala, java and finally python. I'm not surprised by
the results, we've seen it on Stratio since the first versions of spark. At
the beginning of development, some of our engineers make the prototype with
python, but when it comes down to it, if it goes into production, it has to
be rewritten in scala or java, usually scala.



El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<saulo.sobre...@outlook.pt>)
escribió:

> Hi Javier,
>
> Thank you a lot for the feedback.
> Indeed the CPU is a huge limitation. I got a lot of trouble trying to run
> this use case in yarn-client mode. I managed to run this in standalone
> (local master) mode only.
>
> I do not have the hardware available to run this setup in a cluster yet,
> so I decided to dig a little bit more in the implementation to see what
> could I improve. I just finished evaluating some results.
> If you find something wrong or odd please let me know.
>
> Following your suggestion to use "saveToCassandra" directly I decided to
> try Scala. Everything was implemented in the most similar way possible and
> I got surprised by the results. The scala implementation is much faster.
>
> My current implementation is slightly different from the Python code
> shared some emails ago but to compare the languages influence in the most
> comparable way I used the following snippets:
>
> # Scala implementation ------------------
>
> val kstream = KafkaUtils.createDirectStream[String, String](
>                  ssc,
>                  LocationStrategies.PreferConsistent,
>                  ConsumerStrategies.Subscribe[String, String](topic,
> kafkaParams))
> kstream
>            .map( x => parse(x.value) )
>            .saveToCassandra("hdpkns", "batch_measurement")
>
> # Python implementation ----------------
> # Adapted from the previously shared code. However instead of calculating
> the metrics, it is just parsing the messages.
>
> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>> {"metadata.broker.list": brokers})
>>>>
>>>
>>>> kafkaStream \
>>>>     .transform(parse) \
>>>>     .foreachRDD(casssave)
>>>>
>>>>
> For the same streaming input the scala app took an average of ~1.5 seconds
> to handle each event. For the python implementation, the app took an
> average of ~80 seconds to handle each event (and after a lot of pickle
> concurrency access issues).
>
> Note that I considered the time as the difference between the event
> generation (before being published to Kafka) and the moment just before the
> saveToCassandra.
>
> The problem in the python implementation seems to be due to the delay
> introduced by the foreachRDD(casssave) call, which only runs 
> rdd.saveToCassandra(
> "test_hdpkns", "measurement" ).
>
>
> Honestly I was not expecting such a difference between these 2 codes...
> Can you understand why is this happening ?
>
>
>
> Again, Thank you very much for your help,
>
> Best Regards
>
>
> Sharing my current Scala code below
> # Scala Snippet =========================
> val sparkConf = new SparkConf(). // ...
> val ssc = new StreamingContext(sparkConf, Seconds(1))
> val sc = ssc.sparkContext
> //...
> val kstream = KafkaUtils.createDirectStream[String, String](
>                  ssc,
>                  LocationStrategies.PreferConsistent,
>                  ConsumerStrategies.Subscribe[String, String](topic,
> kafkaParams))
> //...
> // handle Kafka messages in a parallel fashion
> val ckstream = kstream.map( x => parse(x.value) ).cache()
> ckstream
>               .foreachRDD( rdd => {
>                     rdd.foreach(metrics)
>               } )
> ckstream
>               .saveToCassandra("hdpkns", "microbatch_raw_measurement")
> #=========================
>
> On 30/04/2018 14:57:50, Javier Pareja <pareja.jav...@gmail.com> wrote:
> Hi Saulo,
>
> If the CPU is close to 100% then you are hitting the limit. I don't think
> that moving to Scala will make a difference. Both Spark and Cassandra are
> CPU hungry, your setup is small in terms of CPUs. Try running Spark on
> another (physical) machine so that the 2 cores are dedicated to Cassandra.
>
> Kind Regards
> Javier
>
>
>
> On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
> wrote:
>
>> Hi Javier,
>>
>> I will try to implement this in scala then. As far as I can see in the
>> documentation there is no SaveToCassandra in the python interface unless
>> you are working with dataframes and the kafkaStream instance does not
>> provide methods to convert an RDD into DF.
>>
>> Regarding my table, it is very simple (see below). Can I change something
>> to make it write faster?
>> CREATE TABLE test_hdpkns.measurement (
>>   mid bigint,
>>   tt timestamp,
>>   in_tt timestamp,
>>   out_tt timestamp,
>>   sensor_id int,
>>   measure double,
>>   PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
>> ) with compact storage;
>>
>> The system CPU while the demo is running is almost always at 100% for
>> both cores.
>>
>>
>> Thank you.
>>
>> Best Regards,
>>
>> On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote:
>> Hi Saulo,
>>
>> I meant using this to save:
>>
>> https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream
>>
>> But it might be slow on a different area.
>> Another point is that Cassandra and spark running on the same machine
>> might compete for resources which will slow down the insert. You can check
>> the CPU usage of the machine at the time. Also the design of the table
>> schema can make a big difference.
>>
>>
>> On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>> wrote:
>>
>>> Hi Javier,
>>>
>>>
>>> I removed the map and used "map" directly instead of using transform,
>>> but the *kafkaStream* is created with KafkaUtils which does not have a
>>> method to save to cassandra directly.
>>>
>>> Do you know any workarround for this?
>>>
>>>
>>> Thank you for the suggestion.
>>>
>>> Best Regards,
>>>
>>> On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote:
>>> Hi Saulo,
>>>
>>> I'm no expert but I will give it a try.
>>> I would remove the rdd2.count(), I can't see the point and you will gain
>>> performance right away. Because of this, I would not use a transform, just
>>> directly the map.
>>> I have not used python but in Scala the cassandra-spark connector can
>>> save directly to Cassandra without a foreachRDD.
>>>
>>> Finally I would use the spark UI to find which stage is the bottleneck
>>> here.
>>>
>>> On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <saulo.sobre...@outlook.pt>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am implementing a use case where I read some sensor data from Kafka
>>>> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and,
>>>> after some transformations, write the output (RDD) to Cassandra.
>>>>
>>>> Everything is working properly but I am having some trouble with the
>>>> performance. My kafka topic receives around 2000 messages per second. For a
>>>> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to
>>>> Cassandra, which is not acceptable for longer runs.
>>>>
>>>> I am running this application in a "sandbox" with 12GB of RAM, 2 cores
>>>> and 30GB SSD space.
>>>> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).
>>>>
>>>> I would like to know you have some suggestion to improve performance
>>>> (other than getting more resources :) ).
>>>>
>>>> My code (pyspark) is posted in the end of this email so you can take a
>>>> look. I tried some different cassandra configurations following this link:
>>>> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
>>>> (recommended in stackoverflow for similar questions).
>>>>
>>>>
>>>> Thank you in advance,
>>>>
>>>> Best Regards,
>>>> Saulo
>>>>
>>>>
>>>>
>>>> =============== # CODE # =================================
>>>> ####
>>>> # run command:
>>>> # spark2-submit --packages
>>>> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
>>>>  --conf spark.cassandra.connection.host='localhost' --num-executors 2
>>>> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
>>>> ##
>>>>
>>>> # Run Spark imports
>>>> from pyspark import SparkConf # SparkContext, SparkConf
>>>> from pyspark.streaming import StreamingContext
>>>> from pyspark.streaming.kafka import KafkaUtils
>>>>
>>>> # Run Cassandra imports
>>>> import pyspark_cassandra
>>>> from pyspark_cassandra import CassandraSparkContext, saveToCassandra
>>>>
>>>> def recordHandler(record):
>>>>     (mid, tt, in_tt, sid, mv) = parseData( record )
>>>>     return processMetrics(mid, tt, in_tt, sid, mv)
>>>>
>>>> def process(time, rdd):
>>>>     rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
>>>>     if rdd2.count() > 0:
>>>>         return rdd2
>>>>
>>>> def casssave(time, rdd):
>>>>     rdd.saveToCassandra( "test_hdpkns", "measurement" )
>>>>
>>>> # ...
>>>> brokers, topic = sys.argv[1:]
>>>>
>>>> # ...
>>>>
>>>> sconf = SparkConf() \
>>>>         .setAppName("SensorDataStreamHandler") \
>>>>         .setMaster("local[*]") \
>>>>         .set("spark.default.parallelism", "2")
>>>>
>>>> sc = CassandraSparkContext(conf = sconf)
>>>> batchIntervalSeconds = 2
>>>> ssc = StreamingContext(sc, batchIntervalSeconds)
>>>>
>>>> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],
>>>> {"metadata.broker.list": brokers})
>>>>
>>>> kafkaStream \
>>>>     .transform(process) \
>>>>     .foreachRDD(casssave)
>>>>
>>>> ssc.start()
>>>> ssc.awaitTermination()
>>>>
>>>> ================================================
>>>>
>>>>
>>>>
>>>>
>>>>

-- 
Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

Reply via email to