Re: Spark SQL JDBC Connectivity and more
1) If I have a standalone spark application that has already built a RDD, how can SharkServer2 or for that matter Shark access 'that' RDD and do queries on it. All the examples I have seen for Shark, the RDD (tables) are created within Shark's spark context and processed. This is not possible out of the box with Shark. If you look at the code for SharkServer2 though, you'll see that its just a standard HiveContext under the covers. If you modify this startup code, any SchemaRDD you register as a table in this context will be exposed over JDBC. [Venkat] Are you saying - pull in the SharkServer2 code in my standalone spark application (as a part of the standalone application process), pass in the spark context of the standalone app to SharkServer2 Sparkcontext at startup and viola we get a SQL/JDBC interfaces for the RDDs of the Standalone app that are exposed as tables? Thanks for the clarification. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511p7264.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Partioner to process data in the same order for each key
I have a data file that I need to process using Spark . The file has multiple events for different users and I need to process the events for each user in the order it is in the file. User 1 : Event 1 User 2: Event 1 User 1 : Event 2 User 3: Event 1 User 2: Event 2 User 3: Event 2 etc.. I want to make sure User 1 events , User 2 events , User 3 events are processed in the "same order" it comes in sequentially for each user (don't care if order is not maintained across users). i.e, Process User 1's: Event 1, Event 2, Event 3 in the same order in "one Spark node" and do the same for User 2 etc. We used the User i as the key (it is unique) and expected the default partitioner which is the hash partitioner to always distribute User 'i' events to one particular node (so that order of events for that user is maintained). It was not happening that way in reality, we were seeing that events for same users were getting distributed to different nodes. May be my understanding of hash partitioner is not correct then or may be we are making some mistake. Is there any standard partitioner that spark supports that we can use if hash partitioner is not the right one for this use case? or Do we write our own partitioner ?- if we need to write a new partitioner, can someone give a psedocode for this use case to help us. Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Partioner-to-process-data-in-the-same-order-for-each-key-tp10977.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark SQL JDBC Connectivity
For the time being, we decided to take a different route. We created a Rest API layer in our app and allowed SQL query passing via the Rest. Internally we pass that query to the SparkSQL layer on the RDD and return back the results. With this Spark SQL is supported for our RDDs via this rest API now. It is easy to do this and took a just a few hours and it works for our use case. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511p10986.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: streaming window not behaving as advertised (v1.0.1)
TD, We are seeing the same issue. We struggled through this until we found this post and the work around. A quick fix in the Spark Streaming software will help a lot for others who are encountering this and pulling their hair out on why RDD on some partitions are not computed (we ended up spending weeks trying to figure out what is happening here and trying out different things). This issue has been around from 0.9 till date (1.01) at least. Thanks, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/streaming-window-not-behaving-as-advertised-v1-0-1-tp10453p11163.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
UpdateStateByKey - How to improve performance?
The method def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] takes Dstream (K,V) and Produces DStream (K,S) in Spark Streaming We have a input Dstream(K,V) that has 40,000 elements. We update on average of 1000 elements of them in every 3 second batch, but based on how this updateStateByKey function is defined, we are looping through 40,000 elements (Seq[V]) to make an update for just 1000 elements and not updating 39000 elements. I think looping through extra 39000 elements is a waste of performance. Isn't there a better way to update this efficiently by just figuring out the a hash map for the 1000 elements that are required to be updated and just updating it (without looping through the unwanted elements)? Shouldn't there be a Streaming update function provided that updates selective members or are we missing some concepts here? I think updateStateByKey may be causing lot of performance degradation in our app as we keep doing this again and again for every batch. Please let us know if my thought process is correct here. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/UpdateStateByKey-How-to-improve-performance-tp11575.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL table Join, one task is taking long
Environment: Spark 1.1, 4 Node Spark and Hadoop Dev cluster - 6 cores, 32 GB Ram each. Default serialization, Standalone, no security Data was sqooped from relational DB to HDFS and Data is partitioned across HDFS uniformly. I am reading a fact table about 8 GB in size and one small dim table from HDFS and then doing a join on them based on a criteria. . Running the Driver on Spark shell on Spark master. ContactDetail and DAgents are read as RDD and registered as table already. Each of these tables have 60 to 90 fields and I am using Product class. val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") CDJoinQry.map(ta => ta(4)).count // result is a small number This works fine and returns the result fine. Hadoop mapPartition reads and creation of RDDs are all fine But in the Count stage, I see that one of task (out of 200 ) does a huge amount of Shuffle Write (some 1 GB or more) and takes about 1.1 seconds to complete out of the 1.2 seconds of total execution time. This task is usually around in the 3/4 th (say 160/200) of the total tasks. At the time of that task running, one of the CPU in one worker node goes to 100% for the duration of the task. Rest of the tasks take few ms and does only < 5 MBs of Shuffle write. I have run it repeatedly and this happens regardless of which worker node this particular task is running on. I turned on Spark debug on all nodes to understand, but it was difficult to figure out where the delay is from the logs. There are no errors or re-trys in the logs. Not sure if I can post logs here for someone to look at, if so I can (about 10 Mb). Also, not sure if this normal in such a table join that one task would take most amount of time. Let me know if you have any suggestions. Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scala Dependency Injection
This is a more of a Scala question than Spark question. Which Dependency Injection framework do you guys use for Scala when using Spark? Is http://scaldi.org/ recommended? Regards Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scala-Dependency-Injection-tp20185.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL table Join, one task is taking long
Bump up. Michael Armbrust, anybody from Spark SQL team? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20218.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark SQL table Join, one task is taking long
Hi Cheng, Thank you very much for taking your time and providing a detailed explanation. I tried a few things you suggested and some more things. The ContactDetail table (8 GB) is the fact table and DAgents is the Dim table (<500 KB), reverse of what you are assuming, but your ideas still apply. I tried the following: a) Cached the smaller Dim table to memory. sqlContext.setConf("spark.sql.autoBroadcastJoinShreshold", "1000") sqlContext.cacheTable("DAgents") UI -> Stage -> Storage shows it to be cached in RDD when I run it. val CDJoinQry= sqlContext.sql("SELECT * FROM ContactDetail, DAgents WHERE ContactDetail.f6 = DAgents.f1 and DAgents.f1 = 902") CDJoinQry.map(ta => ta(4)).count I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min b) I reversed both the order of tables and where clause in the query val CDJoinQry= sqlContext.sql("SELECT * FROM DAgents, ContactDetail WHERE DAgents.f1 = 902 and DAgents.f1 = ContactDetail.f6") The performance went bad. It took 6-7 min to complete. Just changing the order of table in Select for this join, keeping the same where clause order, perf was similar (1.2-1.4 min). c) Using query in a), I tried to keep the storage in columnar fashion with sqlContext.setConf("spark.sql.inMemoryColumnarStorage.compressed", "true") I see no difference in terms of performance. It takes the same amount of time for the query ~1.2 min. Not sure if it even works. d) I tried changing the comma separated HDFS files to Parquet format in HDFS and reading it as parquet and then running query on it. DAgents.saveAsParquetFile("DAgents.parquet") FCDRDD.saveAsParquetFile("ContactDetail.parquet") val DAgentsParquetRDD = sqlContext.parquetFile("DAgents.parquet") DAgentsParquetRDD.registerAsTable("DAgentsParquet") val FContactDetailParquetRDD = sqlContext.parquetFile("ContactDetail.parquet") FContactDetailParquetRDD.registerAsTable("ContactDetailParquet") val CDJoinQryParquet= sqlContext.sql("SELECT * FROM ContactDetailParquet, DAgentsParquet WHERE ContactDetailParquet.f6 = DAgentsParquet.f1 and DAgentsParquet.f1 = 902") CDJoinQryParquet.map(ta => ta(4)).count *The query time is actually more for this join query.* It ended up taking 3.4 min with more data read (2GB) in shuffle reads. Parquet performed worse than non parquet for this join. I changed the query where table order and where clause was reversed and ran it for parquet val CDJoinQryParquetReversed= sqlContext.sql("SELECT * FROM DAgentsParquet, ContactDetailParquet WHERE DAgentsParquet.f1 = 902 and DAgentsParquet.f1=ContactDetailParquet.f6 ") CDJoinQryParquetReversed.map(ta => ta(4)).count it took > 18 min and had to kill it as it kept on running. *But queries where there is no join, Parquet's performance was extremely good.* For example, this query below where there is no join, ran in 8 seconds, whereas the same query in non parquet took 30 seconds. val CDJoinQryParquet0= sqlContext.sql("SELECT * FROM ContactDetailParquet WHERE ContactDetailParquet.f6 = 902") CDJoinQryParquet0.map(ta => ta(4)).count *Some potential conclusions (pl. comment) :* * Order in where clause seems to matter in Spark SQL optimizer. In relational DBs that I have worked with, when I noticed, order of where clause is typically a hint . Would be nice of Spark SQL optimizer is fixed to ignore order of clauses and optimize it automatically. * I tried changing just the table order in Select statement for a join and it also seems to matter when reading data from HDFS (for parquet and to a less extent for non parquet in my case) even when the where clause order is same. Would be nice of SQL optimizer optimizes it automatically. * Table joins for huge table(s) are costly. Fact and Dimension concepts from star schema don't translate well to Big Data (Hadoop, Spark). It may be better to de-normalize and store huge tables to avoid Joins. Joins seem to be evil. (Have tried de-normalizing when using Cassandra, but that has its own problem of resulting in full table scan when running ad-hoc queries when the keys are not known) Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-table-Join-one-task-is-taking-long-tp20124p20389.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dockerized spark executor on mesos?
We have dockerized Spark Master and worker(s) separately and are using it in our dev environment. We don't use Mesos though, running it in Standalone mode, but adding Mesos should not be that difficult I think. Regards Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/dockerized-spark-executor-on-mesos-tp20276p20603.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL JDBC Connectivity
We are planning to use the latest Spark SQL on RDDs. If a third party application wants to connect to Spark via JDBC, does Spark SQL have support? (We want to avoid going though Shark/Hive JDBC layer as we need good performance). BTW, we also want to do the same for Spark Streaming - With Spark SQL work on DStreams (since the underlying structure is RDD anyway) and can we expose the streaming DStream RDD through JDBC via Spark SQL for Realtime analytics. Any pointers on this will greatly help. Regards, Venkat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spark SQL JDBC Connectivity and more
Thanks Michael. OK will try SharkServer2.. But I have some basic questions on a related area: 1) If I have a standalone spark application that has already built a RDD, how can SharkServer2 or for that matter Shark access 'that' RDD and do queries on it. All the examples I have seen for Shark, the RDD (tables) are created within Shark's spark context and processed. I have stylized the real problem we have which is, "we have a standalone spark application that is processing DStreams and producing output Dstreams. I want to expose that near real-time Dstream data to a 3 rd party app via JDBC and allow SharkServer2 CLI to operate and query on the Dstreams real-time all from memory". Currently we are writing output stream to Cassandra and exposing it to 3 rd party app through it via JDBC, but want to avoid that extra disk write which increases latency. 2) I have two applications, one used for processing and computing output RDD from an input and another for post processing the resultant RDD into multiple persistent stores + doing other things with it. These are split in to separate processes intentionally. How do we share the output RDD from first application to second application without writing to disk (thinking of serializing the RDD and streaming through Kafka, but then we loose time and all the fault tolerance that RDD brings in)? Is Tachyon the only other way? Are there other models/design patterns for applications that share RDDs, as this may be a very common use case? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-JDBC-Connectivity-tp6511p6543.html Sent from the Apache Spark User List mailing list archive at Nabble.com.