Connect to postgresql with pyspark
I am new in pyspark and i am learning it in order to complete my Thesis project in university. I am trying to create a dataframe by reading from a postgresql database table, but i am facing a problem when i try to connect my pyspark application with postgresql db server. Could you please explain me the steps that are required in order to have a successfull connection with the database? I am using python 2.7, spark-2.3.0-bin-hadoop2.7, pycharm IDE and windows environmen. What i have done is that i have launched a pyspark shell with --jars /path to postgresql jar/ and the df = sqlContext.read.jdbc(url='jdbc:postgresql://localhost:port/[database]?user='username'='paswd', table='table name') Sent from Mail for Windows 10
Re: [Spark2.1] SparkStreaming to Cassandra performance problem
Hi Saulo, I meant using this to save: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/8_streaming.md#writing-to-cassandra-from-a-stream But it might be slow on a different area. Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference. On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro,wrote: > Hi Javier, > > > I removed the map and used "map" directly instead of using transform, but > the *kafkaStream* is created with KafkaUtils which does not have a method > to save to cassandra directly. > > Do you know any workarround for this? > > > Thank you for the suggestion. > > Best Regards, > > On 29/04/2018 17:03:24, Javier Pareja wrote: > Hi Saulo, > > I'm no expert but I will give it a try. > I would remove the rdd2.count(), I can't see the point and you will gain > performance right away. Because of this, I would not use a transform, just > directly the map. > I have not used python but in Scala the cassandra-spark connector can save > directly to Cassandra without a foreachRDD. > > Finally I would use the spark UI to find which stage is the bottleneck > here. > > On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, > wrote: > >> Hi all, >> >> I am implementing a use case where I read some sensor data from Kafka >> with SparkStreaming interface (*KafkaUtils.createDirectStream*) and, >> after some transformations, write the output (RDD) to Cassandra. >> >> Everything is working properly but I am having some trouble with the >> performance. My kafka topic receives around 2000 messages per second. For a >> 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to >> Cassandra, which is not acceptable for longer runs. >> >> I am running this application in a "sandbox" with 12GB of RAM, 2 cores >> and 30GB SSD space. >> Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). >> >> I would like to know you have some suggestion to improve performance >> (other than getting more resources :) ). >> >> My code (pyspark) is posted in the end of this email so you can take a >> look. I tried some different cassandra configurations following this link: >> http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark >> (recommended in stackoverflow for similar questions). >> >> >> Thank you in advance, >> >> Best Regards, >> Saulo >> >> >> >> === # CODE # = >> >> # run command: >> # spark2-submit --packages >> org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 >> --conf spark.cassandra.connection.host='localhost' --num-executors 2 >> --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 >> ## >> >> # Run Spark imports >> from pyspark import SparkConf # SparkContext, SparkConf >> from pyspark.streaming import StreamingContext >> from pyspark.streaming.kafka import KafkaUtils >> >> # Run Cassandra imports >> import pyspark_cassandra >> from pyspark_cassandra import CassandraSparkContext, saveToCassandra >> >> def recordHandler(record): >> (mid, tt, in_tt, sid, mv) = parseData( record ) >> return processMetrics(mid, tt, in_tt, sid, mv) >> >> def process(time, rdd): >> rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) >> if rdd2.count() > 0: >> return rdd2 >> >> def casssave(time, rdd): >> rdd.saveToCassandra( "test_hdpkns", "measurement" ) >> >> # ... >> brokers, topic = sys.argv[1:] >> >> # ... >> >> sconf = SparkConf() \ >> .setAppName("SensorDataStreamHandler") \ >> .setMaster("local[*]") \ >> .set("spark.default.parallelism", "2") >> >> sc = CassandraSparkContext(conf = sconf) >> batchIntervalSeconds = 2 >> ssc = StreamingContext(sc, batchIntervalSeconds) >> >> kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], >> {"metadata.broker.list": brokers}) >> >> kafkaStream \ >> .transform(process) \ >> .foreachRDD(casssave) >> >> ssc.start() >> ssc.awaitTermination() >> >> >> >> >> >> >>
Re: [Spark2.1] SparkStreaming to Cassandra performance problem
Hi Javier, I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. Do you know any workarround for this? Thank you for the suggestion. Best Regards, On 29/04/2018 17:03:24, Javier Parejawrote: Hi Saulo, I'm no expert but I will give it a try. I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map. I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD. Finally I would use the spark UI to find which stage is the bottleneck here. On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, > wrote: Hi all, I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra. Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs. I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). I would like to know you have some suggestion to improve performance (other than getting more resources :) ). My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions). Thank you in advance, Best Regards, Saulo === # CODE # = # run command: # spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 ## # Run Spark imports from pyspark import SparkConf # SparkContext, SparkConf from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils # Run Cassandra imports import pyspark_cassandra from pyspark_cassandra import CassandraSparkContext, saveToCassandra def recordHandler(record): (mid, tt, in_tt, sid, mv) = parseData( record ) return processMetrics(mid, tt, in_tt, sid, mv) def process(time, rdd): rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) if rdd2.count() > 0: return rdd2 def casssave(time, rdd): rdd.saveToCassandra( "test_hdpkns", "measurement" ) # ... brokers, topic = sys.argv[1:] # ... sconf = SparkConf() \ .setAppName("SensorDataStreamHandler") \ .setMaster("local[*]") \ .set("spark.default.parallelism", "2") sc = CassandraSparkContext(conf = sconf) batchIntervalSeconds = 2 ssc = StreamingContext(sc, batchIntervalSeconds) kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) kafkaStream \ .transform(process) \ .foreachRDD(casssave) ssc.start() ssc.awaitTermination()
Re: [Spark2.1] SparkStreaming to Cassandra performance problem
Hi Saulo, I'm no expert but I will give it a try. I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map. I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD. Finally I would use the spark UI to find which stage is the bottleneck here. On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro,wrote: > Hi all, > > I am implementing a use case where I read some sensor data from Kafka with > SparkStreaming interface (*KafkaUtils.createDirectStream*) and, after > some transformations, write the output (RDD) to Cassandra. > > Everything is working properly but I am having some trouble with the > performance. My kafka topic receives around 2000 messages per second. For a > 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to > Cassandra, which is not acceptable for longer runs. > > I am running this application in a "sandbox" with 12GB of RAM, 2 cores and > 30GB SSD space. > Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1). > > I would like to know you have some suggestion to improve performance > (other than getting more resources :) ). > > My code (pyspark) is posted in the end of this email so you can take a > look. I tried some different cassandra configurations following this link: > http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark > (recommended in stackoverflow for similar questions). > > > Thank you in advance, > > Best Regards, > Saulo > > > > === # CODE # = > > # run command: > # spark2-submit --packages > org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2 > --conf spark.cassandra.connection.host='localhost' --num-executors 2 > --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2 > ## > > # Run Spark imports > from pyspark import SparkConf # SparkContext, SparkConf > from pyspark.streaming import StreamingContext > from pyspark.streaming.kafka import KafkaUtils > > # Run Cassandra imports > import pyspark_cassandra > from pyspark_cassandra import CassandraSparkContext, saveToCassandra > > def recordHandler(record): > (mid, tt, in_tt, sid, mv) = parseData( record ) > return processMetrics(mid, tt, in_tt, sid, mv) > > def process(time, rdd): > rdd2 = rdd.map( lambda w: recordHandler(w[1]) ) > if rdd2.count() > 0: > return rdd2 > > def casssave(time, rdd): > rdd.saveToCassandra( "test_hdpkns", "measurement" ) > > # ... > brokers, topic = sys.argv[1:] > > # ... > > sconf = SparkConf() \ > .setAppName("SensorDataStreamHandler") \ > .setMaster("local[*]") \ > .set("spark.default.parallelism", "2") > > sc = CassandraSparkContext(conf = sconf) > batchIntervalSeconds = 2 > ssc = StreamingContext(sc, batchIntervalSeconds) > > kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], > {"metadata.broker.list": brokers}) > > kafkaStream \ > .transform(process) \ > .foreachRDD(casssave) > > ssc.start() > ssc.awaitTermination() > > > > > > >
Re: A naive ML question
The transactions probably describe from which counterparty assets are transferred to another counterparty at the different stages of the transaction. You could use graphx for that if the algorithms there are suitable for your needs. Still trying to understand what you mean evolve over time? Eg a counterparty has cancelled a lot of transactions or sth like this? Normally it looks like you have a rather straight forward state machine for your transactions. > On 29. Apr 2018, at 12:18, kant kodaliwrote: > > Hi Nick, > > Thanks for that idea!! Just to be more clear. The problem I am trying to > solve is that when a bunch of financial transactional data is thrown at me I > am trying to identify all possible relationships and lineage among them > without explicitly specifying what the relationships are among transactions. > >> On Sun, Apr 29, 2018 at 2:22 AM, Nick Pentreath >> wrote: >> One potential approach could be to construct a transition matrix showing the >> probability of moving from each state to another state. This can be >> visualized with a “heat map” encoding (I think matshow in numpy/matplotlib >> does this). >> >>> On Sat, 28 Apr 2018 at 21:34, kant kodali wrote: >>> Hi, >>> >>> I mean a transaction goes typically goes through different states like >>> STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc... >>> >>> Thanks, >>> kant >>> On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke wrote: What do you mean by “how it evolved over time” ? A transaction describes basically an action at a certain point of time. Do you mean how a financial product evolved over time given a set of a transactions? > On 28. Apr 2018, at 12:46, kant kodali wrote: > > Hi All, > > I have a bunch of financial transactional data and I was wondering if > there is any ML model that can give me a graph structure for this data? > other words, show how a transaction had evolved over time? > > Any suggestions or references would help. > > Thanks! > >>> >
Re: Do GraphFrames support streaming?
What is the use case you are trying to solve? You want to load graph data from a streaming window in separate graphs - possible but requires probably a lot of memory. You want to update an existing graph with new streaming data and then fully rerun an algorithms -> look at Janusgraph You want to update incrementally an existing graph and run incrementally a graph algorithm suitable for this - you have to implement yourself as far as I am aware > On 29. Apr 2018, at 11:43, kant kodaliwrote: > > Do GraphFrames support streaming? - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: A naive ML question
Maybe not necessarily what you want but you could, based on trans attributes, find out initial state and end state and give it to a decision tree to figure out if you if based on these attributes you can oreditc tinal stage Again, not what you asked but an idea to use ml for your data? Kr On Sun, Apr 29, 2018, 10:22 AM Nick Pentreathwrote: > One potential approach could be to construct a transition matrix showing > the probability of moving from each state to another state. This can be > visualized with a “heat map” encoding (I think matshow in numpy/matplotlib > does this). > > On Sat, 28 Apr 2018 at 21:34, kant kodali wrote: > >> Hi, >> >> I mean a transaction goes typically goes through different states like >> STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc... >> >> Thanks, >> kant >> >> On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke >> wrote: >> >>> What do you mean by “how it evolved over time” ? A transaction describes >>> basically an action at a certain point of time. Do you mean how a financial >>> product evolved over time given a set of a transactions? >>> >>> > On 28. Apr 2018, at 12:46, kant kodali wrote: >>> > >>> > Hi All, >>> > >>> > I have a bunch of financial transactional data and I was wondering if >>> there is any ML model that can give me a graph structure for this data? >>> other words, show how a transaction had evolved over time? >>> > >>> > Any suggestions or references would help. >>> > >>> > Thanks! >>> > >>> >> >>
is there a minOffsetsTrigger in spark structured streaming 2.3.0?
Hi All, just like maxOffsetsTrigger is there a minOffsetsTrigger in spark structured streaming 2.3.0? Thanks!
Re: A naive ML question
Hi Nick, Thanks for that idea!! Just to be more clear. The problem I am trying to solve is that when a bunch of financial transactional data is thrown at me I am trying to identify all possible relationships and lineage among them without explicitly specifying what the relationships are among transactions. On Sun, Apr 29, 2018 at 2:22 AM, Nick Pentreathwrote: > One potential approach could be to construct a transition matrix showing > the probability of moving from each state to another state. This can be > visualized with a “heat map” encoding (I think matshow in numpy/matplotlib > does this). > > On Sat, 28 Apr 2018 at 21:34, kant kodali wrote: > >> Hi, >> >> I mean a transaction goes typically goes through different states like >> STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc... >> >> Thanks, >> kant >> >> On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke >> wrote: >> >>> What do you mean by “how it evolved over time” ? A transaction describes >>> basically an action at a certain point of time. Do you mean how a financial >>> product evolved over time given a set of a transactions? >>> >>> > On 28. Apr 2018, at 12:46, kant kodali wrote: >>> > >>> > Hi All, >>> > >>> > I have a bunch of financial transactional data and I was wondering if >>> there is any ML model that can give me a graph structure for this data? >>> other words, show how a transaction had evolved over time? >>> > >>> > Any suggestions or references would help. >>> > >>> > Thanks! >>> > >>> >> >>
Do GraphFrames support streaming?
Do GraphFrames support streaming?
Re: A naive ML question
One potential approach could be to construct a transition matrix showing the probability of moving from each state to another state. This can be visualized with a “heat map” encoding (I think matshow in numpy/matplotlib does this). On Sat, 28 Apr 2018 at 21:34, kant kodaliwrote: > Hi, > > I mean a transaction goes typically goes through different states like > STARTED, PENDING, CANCELLED, COMPLETED, SETTLED etc... > > Thanks, > kant > > On Sat, Apr 28, 2018 at 4:11 AM, Jörn Franke wrote: > >> What do you mean by “how it evolved over time” ? A transaction describes >> basically an action at a certain point of time. Do you mean how a financial >> product evolved over time given a set of a transactions? >> >> > On 28. Apr 2018, at 12:46, kant kodali wrote: >> > >> > Hi All, >> > >> > I have a bunch of financial transactional data and I was wondering if >> there is any ML model that can give me a graph structure for this data? >> other words, show how a transaction had evolved over time? >> > >> > Any suggestions or references would help. >> > >> > Thanks! >> > >> > >