Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-05-28 Thread Saulo Sobreiro
Hi,
I run a few more tests and found that even with a lot more operations on the 
scala side, python is outperformed...

Dataset Stream duration: ~3 minutes (csv formatted data messages read from 
Kafka)
Scala process/store time: ~3 minutes (map with split + metrics calculations + 
store raw + strore metrics )
Python process/store time: ~7 minutes (map with split + store raw )

This is the difference between being usable in production or not. I get that 
python is likely to be slower because of that Python - Java object 
transformations, but I was not expecting such a huge difference.

This results are very interesting as I was comparing to the time that an 
"equivalent" application in storm takes to process the exact same stream (~3 
minutes as well) for the same results and spark was clearly losing the race.

Thank you all for your feedback :)

Regards,
Saulo

On 21/05/2018 14:09:40, Russell Spitzer  wrote:

The answer is most likely that when you use Cross Java - Python code you incur 
a penalty for every objects that you transform from a Java object into a Python 
object (and then back again to a Python object) when data is being passed in 
and out of your functions. A way around this would probably be to have used the 
Dataframe API if possible, which would have compiled the interactions in Java 
and skipped python-java serialization. Using Scala from the start thought is a 
great idea. I would also probably remove the cache from your stream since that 
probably is only hurting (adding an additional serialization which is only used 
once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman 
mailto:alons...@gmail.com>> wrote:
The main language they developed spark with is scala, so all the new features 
go first to scala, java and finally python. I'm not surprised by the results, 
we've seen it on Stratio since the first versions of spark. At the beginning of 
development, some of our engineers make the prototype with python, but when it 
comes down to it, if it goes into production, it has to be rewritten in scala 
or java, usually scala.



El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro 
(mailto:saulo.sobre...@outlook.pt>>) escribió:
Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this 
use case in yarn-client mode. I managed to run this in standalone (local 
master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I 
decided to dig a little bit more in the implementation to see what could I 
improve. I just finished evaluating some results.
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try 
Scala. Everything was implemented in the most similar way possible and I got 
surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared 
some emails ago but to compare the languages influence in the most comparable 
way I used the following snippets:

# Scala implementation --

val kstream = KafkaUtils.createDirectStream[String, String](
 ssc,
 LocationStrategies.PreferConsistent,
 ConsumerStrategies.Subscribe[String, String](topic, 
kafkaParams))
kstream
   .map( x => parse(x.value) )
   .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation 
# Adapted from the previously shared code. However instead of calculating the 
metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(parse) \
.foreachRDD(casssave)


For the same streaming input the scala app took an average of ~1.5 seconds to 
handle each event. For the python implementation, the app took an average of 
~80 seconds to handle each event (and after a lot of pickle concurrency access 
issues).

Note that I considered the time as the difference between the event generation 
(before being published to Kafka) and the moment just before the 
saveToCassandra.

The problem in the python implementation seems to be due to the delay 
introduced by the foreachRDD(casssave) call, which only runs 
rdd.saveToCassandra( "test_hdpkns", "measurement" ).


Honestly I was not expecting such a difference between these 2 codes... Can you 
understand why is this happening ?



Again, Thank you very much for your help,

Best Regards


Sharing my current Scala code below
# Scala Snippet =
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
//...
val kstream = KafkaUtils.createDirectStream[String, String](
 ssc,
 LocationStrategies.PreferConsistent,
 Con

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-05-20 Thread Saulo Sobreiro
Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this 
use case in yarn-client mode. I managed to run this in standalone (local 
master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I 
decided to dig a little bit more in the implementation to see what could I 
improve. I just finished evaluating some results.
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try 
Scala. Everything was implemented in the most similar way possible and I got 
surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared 
some emails ago but to compare the languages influence in the most comparable 
way I used the following snippets:

# Scala implementation --

val kstream = KafkaUtils.createDirectStream[String, String](
 ssc,
 LocationStrategies.PreferConsistent,
 ConsumerStrategies.Subscribe[String, String](topic, 
kafkaParams))
kstream
   .map( x => parse(x.value) )
   .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation 
# Adapted from the previously shared code. However instead of calculating the 
metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(parse) \
.foreachRDD(casssave)


For the same streaming input the scala app took an average of ~1.5 seconds to 
handle each event. For the python implementation, the app took an average of 
~80 seconds to handle each event (and after a lot of pickle concurrency access 
issues).

Note that I considered the time as the difference between the event generation 
(before being published to Kafka) and the moment just before the 
saveToCassandra.

The problem in the python implementation seems to be due to the delay 
introduced by the foreachRDD(casssave) call, which only runs 
rdd.saveToCassandra( "test_hdpkns", "measurement" ).


Honestly I was not expecting such a difference between these 2 codes... Can you 
understand why is this happening ?



Again, Thank you very much for your help,

Best Regards


Sharing my current Scala code below
# Scala Snippet =
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
//...
val kstream = KafkaUtils.createDirectStream[String, String](
 ssc,
 LocationStrategies.PreferConsistent,
 ConsumerStrategies.Subscribe[String, String](topic, 
kafkaParams))
//...
// handle Kafka messages in a parallel fashion
val ckstream = kstream.map( x => parse(x.value) ).cache()
ckstream
  .foreachRDD( rdd => {
rdd.foreach(metrics)
  } )
ckstream
  .saveToCassandra("hdpkns", "microbatch_raw_measurement")
#=


On 30/04/2018 14:57:50, Javier Pareja <pareja.jav...@gmail.com> wrote:

Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think that 
moving to Scala will make a difference. Both Spark and Cassandra are CPU 
hungry, your setup is small in terms of CPUs. Try running Spark on another 
(physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards
Javier



On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, 
<saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote:
Hi Javier,

I will try to implement this in scala then. As far as I can see in the 
documentation there is no SaveToCassandra in the python interface unless you 
are working with dataframes and the kafkaStream instance does not provide 
methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to 
make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage;

The system CPU while the demo is running is almost always at 100% for both 
cores.


Thank you.

Best Regards,


On 29/04/2018 20:46:30, Javier Pareja 
<pareja.jav...@gmail.com<mailto:pareja.jav...@gmail.com>> wrote:

Hi Saulo,

I meant using this to save:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might 
compete for resources which will slow down the insert. You can check the CPU 
usage of the machine at the time. Also the design of the table schema 

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-04-30 Thread Saulo Sobreiro
Hi Javier,

I will try to implement this in scala then. As far as I can see in the 
documentation there is no SaveToCassandra in the python interface unless you 
are working with dataframes and the kafkaStream instance does not provide 
methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to 
make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage;

The system CPU while the demo is running is almost always at 100% for both 
cores.


Thank you.

Best Regards,


On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote:

Hi Saulo,

I meant using this to save:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might 
compete for resources which will slow down the insert. You can check the CPU 
usage of the machine at the time. Also the design of the table schema can make 
a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, 
<saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote:
Hi Javier,


I removed the map and used "map" directly instead of using transform, but the 
kafkaStream is created with KafkaUtils which does not have a method to save to 
cassandra directly.

Do you know any workarround for this?


Thank you for the suggestion.

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja 
<pareja.jav...@gmail.com<mailto:pareja.jav...@gmail.com>> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain 
performance right away. Because of this, I would not use a transform, just 
directly the map.
I have not used python but in Scala the cassandra-spark connector can save 
directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, 
<saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote:

Hi all,

I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I 
tried some different cassandra configurations following this link: 
http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
 (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=== # CODE # =

# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
(mid, tt, in_tt, sid, mv) = parseData( record )
return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
if rdd2.count() > 0:
return rdd2

def casssave(time, rdd):
rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
.setAppName("SensorDataStreamHandler") \
.setMaster("local[*]") \
.set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(process) \
.foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()








Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-04-29 Thread Saulo Sobreiro
Hi Javier,


I removed the map and used "map" directly instead of using transform, but the 
kafkaStream is created with KafkaUtils which does not have a method to save to 
cassandra directly.

Do you know any workarround for this?


Thank you for the suggestion.

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <pareja.jav...@gmail.com> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain 
performance right away. Because of this, I would not use a transform, just 
directly the map.
I have not used python but in Scala the cassandra-spark connector can save 
directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, 
<saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote:

Hi all,

I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I 
tried some different cassandra configurations following this link: 
http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
 (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=== # CODE # =

# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
(mid, tt, in_tt, sid, mv) = parseData( record )
return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
if rdd2.count() > 0:
return rdd2

def casssave(time, rdd):
rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
.setAppName("SensorDataStreamHandler") \
.setMaster("local[*]") \
.set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(process) \
.foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()








[Spark2.1] SparkStreaming to Cassandra performance problem

2018-04-28 Thread Saulo Sobreiro
Hi all,

I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I 
tried some different cassandra configurations following this link: 
http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
 (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=== # CODE # =

# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
(mid, tt, in_tt, sid, mv) = parseData( record )
return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
if rdd2.count() > 0:
return rdd2

def casssave(time, rdd):
rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
.setAppName("SensorDataStreamHandler") \
.setMaster("local[*]") \
.set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(process) \
.foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()








[Spark2.X] SparkStreaming to Cassandra performance problem

2018-04-28 Thread Saulo Sobreiro
Hi all,


I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space. HDP:
Spark 2.1

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).



My code (pyspark) is posted in the end of this email so you can take a look.


Thank you in advance,

Best Regards,
Saulo



=== # CODE # =

# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
(mid, tt, in_tt, sid, mv) = parseData( record )
return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
if rdd2.count() > 0:
return rdd2

def casssave(time, rdd):
rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
.setAppName("SensorDataStreamHandler") \
.setMaster("local[*]") \
.set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(process) \
.foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()