Hi Javier,

I will try to implement this in scala then. As far as I can see in the 
documentation there is no SaveToCassandra in the python interface unless you 
are working with dataframes and the kafkaStream instance does not provide 
methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to 
make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage;

The system CPU while the demo is running is almost always at 100% for both 
cores.


Thank you.

Best Regards,


On 29/04/2018 20:46:30, Javier Pareja <pareja.jav...@gmail.com> wrote:

Hi Saulo,

I meant using this to save:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might 
compete for resources which will slow down the insert. You can check the CPU 
usage of the machine at the time. Also the design of the table schema can make 
a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, 
<saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote:
Hi Javier,


I removed the map and used "map" directly instead of using transform, but the 
kafkaStream is created with KafkaUtils which does not have a method to save to 
cassandra directly.

Do you know any workarround for this?


Thank you for the suggestion.

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja 
<pareja.jav...@gmail.com<mailto:pareja.jav...@gmail.com>> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain 
performance right away. Because of this, I would not use a transform, just 
directly the map.
I have not used python but in Scala the cassandra-spark connector can save 
directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, 
<saulo.sobre...@outlook.pt<mailto:saulo.sobre...@outlook.pt>> wrote:

Hi all,

I am implementing a use case where I read some sensor data from Kafka with 
SparkStreaming interface (KafkaUtils.createDirectStream) and, after some 
transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the 
performance. My kafka topic receives around 2000 messages per second. For a 4 
min. test, the SparkStreaming app takes 6~7 min. to process and write to 
Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB 
SSD space.
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other 
than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I 
tried some different cassandra configurations following this link: 
http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark
 (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages 
org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2
  --conf spark.cassandra.connection.host='localhost' --num-executors 2 
--executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply via email to