Spark SQL queries hive table, real time ?
Hello, I'm actually asking my self about performance of using Spark SQL with Hive to do real time analytics. I know that Hive has been created for batch processing, and Spark is use to do fast queries. But, use Spark SQL with Hive will allow me to do real time queries ? Or it just will make fastest queries but not real time. Should I use an other datawarehouse, like Hbase ? Thanks in advance for your time and consideration, Florian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-queries-hive-table-real-time-tp23642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException in spark with mysql database
Hi! I am trying to load data from my sql database using following code val query=select * from +table+ val url = jdbc:mysql:// + dataBaseHost + : + dataBasePort + / + dataBaseName + ?user= + db_user + password= + db_pass val sc = new SparkContext(new SparkConf().setAppName(SparkJdbcDs).setMaster(local[*])) val sqlContext = new SQLContext(sc) val options = new HashMap[String, String]() options.put(driver, com.mysql.jdbc.Driver) options.put(url, url) options.put(dbtable, query) options.put(numPartitions, 1) sqlContext.load(jdbc, options) And I get following exception Exception in thread main com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'select * from tempTable WHERE 1=0' -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/com-mysql-jdbc-exceptions-jdbc4-MySQLSyntaxErrorException-in-spark-with-mysql-database-tp23643.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Split RDD into two in a single pass
This comes up so often. I wonder if the documentation or the API could be changed to answer this question. The solution I found is from http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job. You basically write the items into two directories in a single pass through the RDD. Then you read back the two directories as two RDDs. It avoids traversing the RDD twice, but writing and reading to the file system is also costly. It may not worth it always. On Mon, Jul 6, 2015 at 9:32 AM, Anand Nalya anand.na...@gmail.com wrote: Hi, I've a RDD which I want to split into two disjoint RDDs on with a boolean function. I can do this with the following val rdd1 = rdd.filter(f) val rdd2 = rdd.filter(fnot) I'm assuming that each of the above statement will traverse the RDD once thus resulting in 2 passes. Is there a way of doing this in a single pass over the RDD so that when f returns true, the element goes to rdd1 and to rdd2 otherwise. Regards, Anand
Re: Re: Application jar file not found exception when submitting application
Thanks Shixiong for the reply. Yes, I confirm that the file exists there ,simply checks with ls -l /data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar bit1...@163.com From: Shixiong Zhu Date: 2015-07-06 18:41 To: bit1...@163.com CC: user Subject: Re: Application jar file not found exception when submitting application Before running your script, could you confirm that /data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar exists? You might forget to build this jar. Best Regards, Shixiong Zhu 2015-07-06 18:14 GMT+08:00 bit1...@163.com bit1...@163.com: Hi, I have following shell script that will submit the application to the cluster. But whenever I start the application, I encounter FileNotFoundException, after retrying for serveral times, I can successfully submit it! SPARK=/data/software/spark-1.3.1-bin-2.4.0 APP_HOME=/data/software/spark-1.3.1-bin-2.4.0/applications $SPARK/bin/spark-submit --deploy-mode cluster --name PssAmStreamingApplication --master spark:/com-app1:7077 --driver-memory 1G --executor-memory 4G --total-executor-cores 10 --class com.app.PssAmStreamingApplicationDriver $APP_HOME/pss.am.core-1.0-SNAPSHOT-shaded.jar [root@com-app2 applications]# ./submitApplicationStreaming.sh Running Spark using the REST application submission protocol. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/06 18:05:35 INFO StandaloneRestClient: Submitting a request to launch an application in spark://com-app1:7077. Warning: Master endpoint spark://com-app1:7077 was not a REST server. Falling back to legacy submission gateway instead. 15/07/06 18:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Sending launch command to spark://com-app1:7077 Driver successfully submitted as driver-20150706180538-0008 ... waiting before polling master for driver state ... polling master for driver state State of driver-20150706180538-0008 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:464) at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:146) at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:72) bit1...@163.com
Spark equivalent for Oracle's analytical functions
Is there any equivalent of Oracle's *analytical functions* in Spark SQL. For example, if I have following data set (say table T): /EID|DEPT 101|COMP 102|COMP 103|COMP 104|MARK/ In Oracle, I can do something like /select EID, DEPT, count(1) over (partition by DEPT) CNT from T;/ to get: /EID|DEPT|CNT 101|COMP|3 102|COMP|3 103|COMP|3 104|MARK|1/ Can we do an equivalent query in Spark-SQL? Or what is the best method to get such results in Spark dataframes? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-equivalent-for-Oracle-s-analytical-functions-tp23646.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[SPARK-SQL] Re-use col alias in the select clause to avoid sub query
Hi, I want to re-use column alias in the select clause to avoid sub query. For example: select check(key) as b, abs(b) as abs, value1, value2, ..., value30 from test The query above does not work, because b is not defined in the test's schema. In stead, I should change the query to the following: select check(key) as b, abs(check(key)) as abs, value1, value2, ..., value30 from test Apparently, check function are called twice. In my use case, the check function is time-consuming. The workaround is to use sub-query : select b, abs(b), value1, value2, ..., value30 as abs from ( select check(key) as b, value1, value2, ..., value30 from test ) t The problem is that I have to repeat the 30 following column twice. Image the following case which does not work: select check(key) as b, abs(b) as absv, tan(absv) as tanv, value1, value2, ..., value30 from test In order not to call my check function many times, I need to change the query to 3 sub-queries, which makes query too long, hard to read. I am wondering whether we can reuse column alias in an efficient way ? Thank you -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Re-use-col-alias-in-the-select-clause-to-avoid-sub-query-tp23645.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Application jar file not found exception when submitting application
Hi, I have following shell script that will submit the application to the cluster. But whenever I start the application, I encounter FileNotFoundException, after retrying for serveral times, I can successfully submit it! SPARK=/data/software/spark-1.3.1-bin-2.4.0 APP_HOME=/data/software/spark-1.3.1-bin-2.4.0/applications $SPARK/bin/spark-submit --deploy-mode cluster --name PssAmStreamingApplication --master spark:/com-app1:7077 --driver-memory 1G --executor-memory 4G --total-executor-cores 10 --class com.app.PssAmStreamingApplicationDriver $APP_HOME/pss.am.core-1.0-SNAPSHOT-shaded.jar [root@com-app2 applications]# ./submitApplicationStreaming.sh Running Spark using the REST application submission protocol. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/06 18:05:35 INFO StandaloneRestClient: Submitting a request to launch an application in spark://com-app1:7077. Warning: Master endpoint spark://com-app1:7077 was not a REST server. Falling back to legacy submission gateway instead. 15/07/06 18:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Sending launch command to spark://com-app1:7077 Driver successfully submitted as driver-20150706180538-0008 ... waiting before polling master for driver state ... polling master for driver state State of driver-20150706180538-0008 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:464) at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:146) at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:72) bit1...@163.com
[SparkR] Float type coercion with hiveContext
Hello, I'm got a trouble with float type coercion on SparkR with hiveContext. result - sql(hiveContext, SELECT offset, percentage from data limit 100) show(result) DataFrame[offset:float, percentage:float] head(result) Error in as.data.frame.default(x[[i]], optional = TRUE) : cannot coerce class jobj to a data.frame This trouble looks like already exists (SPARK-2863 - Emulate Hive type coercion in native reimplementations of Hive functions) with same reason - not completed native reimplementations of Hive... not ...functions only. It looks like a bug. So, anybody met this issue before? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Application jar file not found exception when submitting application
Before running your script, could you confirm that /data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar exists? You might forget to build this jar. Best Regards, Shixiong Zhu 2015-07-06 18:14 GMT+08:00 bit1...@163.com bit1...@163.com: Hi, I have following shell script that will submit the application to the cluster. But whenever I start the application, I encounter FileNotFoundException, after retrying for serveral times, I can successfully submit it! SPARK=/data/software/spark-1.3.1-bin-2.4.0 APP_HOME=/data/software/spark-1.3.1-bin-2.4.0/applications $SPARK/bin/spark-submit --deploy-mode cluster --name PssAmStreamingApplication --master spark:/com-app1:7077 --driver-memory 1G --executor-memory 4G --total-executor-cores 10 --class com.app.PssAmStreamingApplicationDriver $APP_HOME/pss.am.core-1.0-SNAPSHOT-shaded.jar [root@com-app2 applications]# ./submitApplicationStreaming.sh Running Spark using the REST application submission protocol. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/06 18:05:35 INFO StandaloneRestClient: Submitting a request to launch an application in spark://com-app1:7077. Warning: Master endpoint spark://com-app1:7077 was not a REST server. Falling back to legacy submission gateway instead. 15/07/06 18:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Sending launch command to spark://com-app1:7077 Driver successfully submitted as driver-20150706180538-0008 ... waiting before polling master for driver state ... polling master for driver state State of driver-20150706180538-0008 is ERROR Exception from cluster was: java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist java.io.FileNotFoundException: File file:/data/software/spark-1.3.1-bin-2.4.0/applications/pss.am.core-1.0-SNAPSHOT-shaded.jar does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397) at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:464) at org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:146) at org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:72) -- bit1...@163.com
Spark's equivalent for Analytical functions in Oracle
Hi there, I would like to check with you whether there is any equivalent functions of Oracle's analytical functions in Spark SQL. For example, if I have following data set (table T): *EID|DEPT* *101|COMP* *102|COMP* *103|COMP* *104|MARK* In Oracle, I can do something like *select EID, DEPT, count(1) over (partition by DEPT) CNT from T;* to get: *EID|DEPT|CNT* *101|COMP|3* *102|COMP|3* *103|COMP|3* *104|MARK|1* Can we do an equivalent query in Spark-SQL? Or what is the best method to get such results in Spark dataframes? Thank you, Gireesh
Re: 1.4.0 regression: out-of-memory errors on small data
I went ahead and tested your file and the results from the tests can be seen in the gist: https://gist.github.com/dennyglee/c933b5ae01c57bd01d94. Basically, when running {Java 7, MaxPermSize = 256} or {Java 8, default} the query ran without any issues. I was able to recreate the issue with {Java 7, default}. I included the commands I used to start the spark-shell but basically I just used all defaults (no alteration to driver or executor memory) with the only additional call was with driver-class-path to connect to MySQL Hive metastore. This is on OSX Macbook Pro. One thing I did notice is that your version of Java 7 is version 51 while my version of Java 7 version 79. Could you see if updating to Java 7 version 79 perhaps allows you to use the MaxPermSize call? On Mon, Jul 6, 2015 at 1:36 PM Simeon Simeonov s...@swoop.com wrote: The file is at https://www.dropbox.com/s/a00sd4x65448dl2/apache-spark-failure-data-part-0.gz?dl=1 The command was included in the gist SPARK_REPL_OPTS=-XX:MaxPermSize=256m spark-1.4.0-bin-hadoop2.6/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --driver-memory 4g --executor-memory 4g /Sim Simeon Simeonov, Founder CTO, Swoop http://swoop.com/ @simeons http://twitter.com/simeons | blog.simeonov.com | 617.299.6746 From: Yin Huai yh...@databricks.com Date: Monday, July 6, 2015 at 12:59 AM To: Simeon Simeonov s...@swoop.com Cc: Denny Lee denny.g@gmail.com, Andy Huang andy.hu...@servian.com.au, user user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data I have never seen issue like this. Setting PermGen size to 256m should solve the problem. Can you send me your test file and the command used to launch the spark shell or your application? Thanks, Yin On Sun, Jul 5, 2015 at 9:17 PM, Simeon Simeonov s...@swoop.com wrote: Yin, With 512Mb PermGen, the process still hung and had to be kill -9ed. At 1Gb the spark shell associated processes stopped hanging and started exiting with scala println(dfCount.first.getLong(0)) 15/07/06 00:10:07 INFO storage.MemoryStore: ensureFreeSpace(235040) called with curMem=0, maxMem=2223023063 15/07/06 00:10:07 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 229.5 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.MemoryStore: ensureFreeSpace(20184) called with curMem=235040, maxMem=2223023063 15/07/06 00:10:08 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.7 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:65464 (size: 19.7 KB, free: 2.1 GB) 15/07/06 00:10:08 INFO spark.SparkContext: Created broadcast 2 from first at console:30 java.lang.OutOfMemoryError: PermGen space Stopping spark context. Exception in thread main Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread main 15/07/06 00:10:14 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:65464 in memory (size: 19.7 KB, free: 2.1 GB) That did not change up until 4Gb of PermGen space and 8Gb for driver executor each. I stopped at this point because the exercise started looking silly. It is clear that 1.4.0 is using memory in a substantially different manner. I'd be happy to share the test file so you can reproduce this in your own environment. /Sim Simeon Simeonov, Founder CTO, Swoop http://swoop.com/ @simeons http://twitter.com/simeons | blog.simeonov.com | 617.299.6746 From: Yin Huai yh...@databricks.com Date: Sunday, July 5, 2015 at 11:04 PM To: Denny Lee denny.g@gmail.com Cc: Andy Huang andy.hu...@servian.com.au, Simeon Simeonov s...@swoop.com, user user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data Sim, Can you increase the PermGen size? Please let me know what is your setting when the problem disappears. Thanks, Yin On Sun, Jul 5, 2015 at 5:59 PM, Denny Lee denny.g@gmail.com wrote: I had run into the same problem where everything was working swimmingly with Spark 1.3.1. When I switched to Spark 1.4, either by upgrading to Java8 (from Java7) or by knocking up the PermGenSize had solved my issue. HTH! On Mon, Jul 6, 2015 at 8:31 AM Andy Huang andy.hu...@servian.com.au wrote: We have hit the same issue in spark shell when registering a temp table. We observed it happening with those who had JDK 6. The problem went away after installing jdk 8. This was only for the tutorial materials which was about loading a parquet file. Regards Andy On Sat, Jul 4, 2015 at 2:54 AM, sim s...@swoop.com wrote: @bipin, in my case the error happens immediately in a fresh shell in 1.4.0. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/1-4-0-regression-out-of-memory-errors-on-small-data-tp23595p23614.html Sent from
writing to kafka using spark streaming
I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since different executors in spark are allocated at each run so instantiating a new kafka producer at each run seems a costly operation .Is there a way to reuse objects in processing executors(not in receivers)?
Re: How to shut down spark web UI?
You can set spark.ui.enabled to false to disable the Web UI. Best Regards, Shixiong Zhu 2015-07-06 17:05 GMT+08:00 luohui20...@sina.com: Hello there, I heard that there is some way to shutdown Spark WEB UI, is there a configuration to support this? Thank you. Thanksamp;Best regards! San.Luo
Re: Unable to start spark-sql
Thanks alot AKhil On Mon, Jul 6, 2015 at 12:57 PM, sandeep vura sandeepv...@gmail.com wrote: It Works !!! On Mon, Jul 6, 2015 at 12:40 PM, sandeep vura sandeepv...@gmail.com wrote: oK Let me try On Mon, Jul 6, 2015 at 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its complaining for a jdbc driver. Add it in your driver classpath like: ./bin/spark-sql --driver-class-path /home/akhld/sigmoid/spark/lib/mysql-connector-java-5.1.32-bin.jar Thanks Best Regards On Mon, Jul 6, 2015 at 11:42 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Sparkers, I am unable to start spark-sql service please check the error as mentioned below. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at
Re: Unable to start spark-sql
It Works !!! On Mon, Jul 6, 2015 at 12:40 PM, sandeep vura sandeepv...@gmail.com wrote: oK Let me try On Mon, Jul 6, 2015 at 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its complaining for a jdbc driver. Add it in your driver classpath like: ./bin/spark-sql --driver-class-path /home/akhld/sigmoid/spark/lib/mysql-connector-java-5.1.32-bin.jar Thanks Best Regards On Mon, Jul 6, 2015 at 11:42 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Sparkers, I am unable to start spark-sql service please check the error as mentioned below. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at
Re: java.lang.IllegalArgumentException: A metric named ... already exists
I have already opened a JIRA about this. https://issues.apache.org/jira/browse/SPARK-8743 On Mon, Jul 6, 2015 at 1:02 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I haven't been able to reproduce the error reliably, I will open a JIRA as soon as I can Greetings, Juan 2015-06-23 21:57 GMT+02:00 Tathagata Das t...@databricks.com: Aaah this could be potentially major issue as it may prevent metrics from restarted streaming context be not published. Can you make it a JIRA. TD On Tue, Jun 23, 2015 at 7:59 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm running a program in Spark 1.4 where several Spark Streaming contexts are created from the same Spark context. As pointed in https://spark.apache.org/docs/latest/streaming-programming-guide.html each Spark Streaming context is stopped before creating the next Spark Streaming context. The program works ok, but I get exceptions like the following when a new Spark Streaming context is created 15/06/23 16:34:51 INFO MetricsSystem: Metrics already registered java.lang.IllegalArgumentException: A metric named local-1435070090627.driver.SampleStreamingTest.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime already exists at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) at com.codahale.metrics.MetricRegistry.registerAll(MetricRegistry.java:385) at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:85) Is this something to be concerned, or just a minor nuisance? Thanks a lot in advance. Greetings, Juan Rodriguez Hortala
Re: java.lang.IllegalArgumentException: A metric named ... already exists
Hi, I haven't been able to reproduce the error reliably, I will open a JIRA as soon as I can Greetings, Juan 2015-06-23 21:57 GMT+02:00 Tathagata Das t...@databricks.com: Aaah this could be potentially major issue as it may prevent metrics from restarted streaming context be not published. Can you make it a JIRA. TD On Tue, Jun 23, 2015 at 7:59 AM, Juan Rodríguez Hortalá juan.rodriguez.hort...@gmail.com wrote: Hi, I'm running a program in Spark 1.4 where several Spark Streaming contexts are created from the same Spark context. As pointed in https://spark.apache.org/docs/latest/streaming-programming-guide.html each Spark Streaming context is stopped before creating the next Spark Streaming context. The program works ok, but I get exceptions like the following when a new Spark Streaming context is created 15/06/23 16:34:51 INFO MetricsSystem: Metrics already registered java.lang.IllegalArgumentException: A metric named local-1435070090627.driver.SampleStreamingTest.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime already exists at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91) at com.codahale.metrics.MetricRegistry.registerAll(MetricRegistry.java:385) at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:85) Is this something to be concerned, or just a minor nuisance? Thanks a lot in advance. Greetings, Juan Rodriguez Hortala
Spark-CSV: Multiple delimiters and Null fields support
Hi all, Apparently, we can only specify character delimiter for tokenizing data using Spark-CSV. But what if we have a log file with multiple delimiters or even a multi-character delimiter? e.g. (field1,field2:field3) with delimiters [,:] and (field1::field2::field3) with a single multi-character delimiter [::]. Further, is there a way to specify null fields? e.g. if the data contains \n in any field, a null should be stored against that field in DataFrame. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-CSV-Multiple-delimiters-and-Null-fields-support-tp23644.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Split RDD into two in a single pass
Hi, I've a RDD which I want to split into two disjoint RDDs on with a boolean function. I can do this with the following val rdd1 = rdd.filter(f) val rdd2 = rdd.filter(fnot) I'm assuming that each of the above statement will traverse the RDD once thus resulting in 2 passes. Is there a way of doing this in a single pass over the RDD so that when f returns true, the element goes to rdd1 and to rdd2 otherwise. Regards, Anand
How to shut down spark web UI?
Hello there, I heard that there is some way to shutdown Spark WEB UI, is there a configuration to support this? Thank you. Thanksamp;Best regards! San.Luo
Re: Spark's equivalent for Analytical functions in Oracle
Its available in Spark 1.4 under dataframe window operations. Apparently programming doc doesnot mention it, you need to look at the apis. On Mon, Jul 6, 2015 at 8:50 PM, Gireesh Puthumana gireesh.puthum...@augmentiq.in wrote: Hi there, I would like to check with you whether there is any equivalent functions of Oracle's analytical functions in Spark SQL. For example, if I have following data set (table T): *EID|DEPT* *101|COMP* *102|COMP* *103|COMP* *104|MARK* In Oracle, I can do something like *select EID, DEPT, count(1) over (partition by DEPT) CNT from T;* to get: *EID|DEPT|CNT* *101|COMP|3* *102|COMP|3* *103|COMP|3* *104|MARK|1* Can we do an equivalent query in Spark-SQL? Or what is the best method to get such results in Spark dataframes? Thank you, Gireesh -- Best Regards, Ayan Guha
kafka offset commit in spark streaming 1.2
In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
RE: kafka offset commit in spark streaming 1.2
If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable) and explicitly call commitOffset to update offset to Kafka AFTER WAL is done. No matter what you’re setting with autocommit.enable, internally Spark Streaming will set it to false to turn off autocommit mechanism. Thanks Jerry From: Shushant Arora [mailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:11 PM To: user Subject: kafka offset commit in spark streaming 1.2 In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.idhttp://group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
Re: Spark SQL queries hive table, real time ?
Within the context of your question, Spark SQL utilizing the Hive context is primarily about very fast queries. If you want to use real-time queries, I would utilize Spark Streaming. A couple of great resources on this topic include Guest Lecture on Spark Streaming in Stanford CME 323: Distributed Algorithms and Optimization http://www.slideshare.net/tathadas/guest-lecture-on-spark-streaming-in-standford and Recipes for Running Spark Streaming Applications in Production https://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/ (from the recent Spark Summit 2015) HTH! On Mon, Jul 6, 2015 at 3:23 PM spierki florian.spierc...@crisalid.com wrote: Hello, I'm actually asking my self about performance of using Spark SQL with Hive to do real time analytics. I know that Hive has been created for batch processing, and Spark is use to do fast queries. But, use Spark SQL with Hive will allow me to do real time queries ? Or it just will make fastest queries but not real time. Should I use an other datawarehouse, like Hbase ? Thanks in advance for your time and consideration, Florian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-queries-hive-table-real-time-tp23642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [SparkR] Float type coercion with hiveContext
I used spark 1.4.0 binaries from official site: http://spark.apache.org/downloads.html And running it on: * Hortonworks HDP 2.2.0.0-2041 * with Hive 0.14 * with disabled hooks for Application Timeline Servers (ATSHook) in hive-site.xml (commented hive.exec.failure.hooks, hive.exec.post.hooks, hive.exec.pre.hooks) On Mon, Jul 6, 2015 at 1:33 PM, huangzheng 1106944...@qq.com wrote: Hi , Are you used sparkR about spark 1.4 version? How do build from spark source code ? -- 原始邮件 -- 发件人: Evgeny Sinelnikov;esinelni...@griddynamics.com; 发送时间: 2015年7月6日(星期一) 晚上6:31 收件人: useruser@spark.apache.org; 主题: [SparkR] Float type coercion with hiveContext Hello, I'm got a trouble with float type coercion on SparkR with hiveContext. result - sql(hiveContext, SELECT offset, percentage from data limit 100) show(result) DataFrame[offset:float, percentage:float] head(result) Error in as.data.frame.default(x[[i]], optional = TRUE) : cannot coerce class jobj to a data.frame This trouble looks like already exists (SPARK-2863 - Emulate Hive type coercion in native reimplementations of Hive functions) with same reason - not completed native reimplementations of Hive... not ...functions only. It looks like a bug. So, anybody met this issue before? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How Will Spark Execute below Code - Driver and Executors
Hi All , If some one can help me understand as which portion of the code gets executed on Driver and which portion will be executed on executor from the below code it would be a great help I have to load data from 10 Tables and then use that data in various manipulation and i am using SPARK SQL for that please let me know if below code will be executed on the driver or it will be executed in each executor node. And if i do a join on the data frame will it happen on executor or driver ? options.put(dbtable, (select * from t_table1) as t_table1); DataFrame t_gsubmember = sqlContext.read().format(jdbc).options(options).load(); t_table1.cache(); options.put(dbtable, (select * from t_table2) as t_table2); DataFrame t_sub = sqlContext.read().format(jdbc).options(options).load(); t_table2.cache(); options.put(dbtable, (select * from t_table3) as t_table3); DataFrame t_pi = sqlContext.read().format(jdbc).options(options).load(); t_table3.cache(); And So on Thanks
RE: kafka offset commit in spark streaming 1.2
Please see the inline comments. From: Shushant Arora [mailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:51 PM To: Shao, Saisai Cc: user Subject: Re: kafka offset commit in spark streaming 1.2 So If WAL is disabled, how developer can commit offset explicitly in spark streaming app since we don't write code which will be executed in receiver ? I think it is difficult for user to commit offset explicitly in receiver-based Spark Streaming Kafka API. If you want to explicitly commit offset, you could try Spark Streaming Kafka direct API, which is newly added in Spark 1.3+, where you could manage the offsets yourself, it is implemented based on Kafka’s low-level API. Plus since offset commitment is asynchronoous, is it possible -it may happen last offset is not commited yet and next stream batch started on receiver and it may get duplicate data ? Yes, it is possible, so receiver based Spark Streaming Kafka API cannot guarantee no duplication and no data lost. If you enable WAL, no data lost can be guaranteed by still you will meet duplication. So the best way is to use Spark Streaming Kafka direct API with your own offset management to make sure exact-once. On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: If you disable WAL, Spark Streaming itself will not manage any offset related things, is auto commit is enabled by true, Kafka itself will update offsets in a time-based way, if auto commit is disabled, no any part will call commitOffset, you need to call this API yourself. Also Kafka’s offset commitment mechanism is actually a timer way, so it is asynchronized with replication. From: Shushant Arora [mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:30 PM To: Shao, Saisai Cc: user Subject: Re: kafka offset commit in spark streaming 1.2 And what if I disable WAL and use replication of receiver data using StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the message or will it use autocommit.enable value. And if it uses this value what if autocommit.enable is set to false then when does receiver calls commitOffset? On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable) and explicitly call commitOffset to update offset to Kafka AFTER WAL is done. No matter what you’re setting with autocommit.enable, internally Spark Streaming will set it to false to turn off autocommit mechanism. Thanks Jerry From: Shushant Arora [mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:11 PM To: user Subject: kafka offset commit in spark streaming 1.2 In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.idhttp://group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
RE: kafka offset commit in spark streaming 1.2
If you disable WAL, Spark Streaming itself will not manage any offset related things, is auto commit is enabled by true, Kafka itself will update offsets in a time-based way, if auto commit is disabled, no any part will call commitOffset, you need to call this API yourself. Also Kafka’s offset commitment mechanism is actually a timer way, so it is asynchronized with replication. From: Shushant Arora [mailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:30 PM To: Shao, Saisai Cc: user Subject: Re: kafka offset commit in spark streaming 1.2 And what if I disable WAL and use replication of receiver data using StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the message or will it use autocommit.enable value. And if it uses this value what if autocommit.enable is set to false then when does receiver calls commitOffset? On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable) and explicitly call commitOffset to update offset to Kafka AFTER WAL is done. No matter what you’re setting with autocommit.enable, internally Spark Streaming will set it to false to turn off autocommit mechanism. Thanks Jerry From: Shushant Arora [mailto:shushantaror...@gmail.commailto:shushantaror...@gmail.com] Sent: Monday, July 6, 2015 8:11 PM To: user Subject: kafka offset commit in spark streaming 1.2 In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.idhttp://group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.mshttp://zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.mshttp://zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
Re: kafka offset commit in spark streaming 1.2
So If WAL is disabled, how developer can commit offset explicitly in spark streaming app since we don't write code which will be executed in receiver ? Plus since offset commitment is asynchronoous, is it possible -it may happen last offset is not commited yet and next stream batch started on receiver and it may get duplicate data ? On Mon, Jul 6, 2015 at 6:16 PM, Shao, Saisai saisai.s...@intel.com wrote: If you disable WAL, Spark Streaming itself will not manage any offset related things, is auto commit is enabled by true, Kafka itself will update offsets in a time-based way, if auto commit is disabled, no any part will call commitOffset, you need to call this API yourself. Also Kafka’s offset commitment mechanism is actually a timer way, so it is asynchronized with replication. *From:* Shushant Arora [mailto:shushantaror...@gmail.com] *Sent:* Monday, July 6, 2015 8:30 PM *To:* Shao, Saisai *Cc:* user *Subject:* Re: kafka offset commit in spark streaming 1.2 And what if I disable WAL and use replication of receiver data using StorageLevel.MEMORY_ONLY2(). Will it commit offset after replicating the message or will it use autocommit.enable value. And if it uses this value what if autocommit.enable is set to false then when does receiver calls commitOffset? On Mon, Jul 6, 2015 at 5:53 PM, Shao, Saisai saisai.s...@intel.com wrote: If you’re using WAL with Kafka, Spark Streaming will ignore this configuration(autocommit.enable) and explicitly call commitOffset to update offset to Kafka AFTER WAL is done. No matter what you’re setting with autocommit.enable, internally Spark Streaming will set it to false to turn off autocommit mechanism. Thanks Jerry *From:* Shushant Arora [mailto:shushantaror...@gmail.com] *Sent:* Monday, July 6, 2015 8:11 PM *To:* user *Subject:* kafka offset commit in spark streaming 1.2 In spark streaming 1.2 , Is offset of kafka message consumed are updated in zookeeper only after writing in WAL if WAL and checkpointig are enabled or is it depends upon kafkaparams while initialing the kafkaDstream. MapString,String kafkaParams = new HashMapString, String(); kafkaParams.put(zookeeper.connect,ip:2181); kafkaParams.put(group.id, testgroup); kafkaParams.put(zookeeper.session.timeout.ms, 1); kafkaParams.put(autocommit.enable,true); kafkaParams.put(zookeeper.sync.time.ms, 250); kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, byte[].class,kafka.serializer.DefaultDecoder.class , kafka.serializer.DefaultDecoder.class, kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); Here since I have set autocommit.enable to true , does spark streaming will ignore this and always call explicit commitOffset high level consumer connector or does it depends on parameter passed? Since if it depends upon parameter and receiver calls explicit commit only when autocommit is false, then I should override the default autocommit to false from true while enabling WAL, since it may give duplicate in case of failure if WAL is enabled and autocommit is true.
Re: writing to kafka using spark streaming
Use foreachPartition, and allocate whatever the costly resource is once per partition. On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com wrote: I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since different executors in spark are allocated at each run so instantiating a new kafka producer at each run seems a costly operation .Is there a way to reuse objects in processing executors(not in receivers)?
Converting spark JDBCRDD to DataFrame
Hi all! what is the most efficient way to convert jdbcRDD to DataFrame. any example? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Converting-spark-JDBCRDD-to-DataFrame-tp23647.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Restarting Spark Streaming Application with new code
You shouldn't rely on being able to restart from a checkpoint after changing code, regardless of whether the change was explicitly related to serialization. If you are relying on checkpoints to hold state, specifically which offsets have been processed, that state will be lost if you can't recover from the checkpoint. After restart the stream will start receiving messages based on the auto.offset.reset setting, either the beginning or the end of the kafka retention. To avoid this, save state in your own data store. On Sat, Jul 4, 2015 at 9:01 PM, Vinoth Chandar vin...@uber.com wrote: Hi, Just looking for some clarity on the below 1.4 documentation. And restarting from earlier checkpoint information of pre-upgrade code cannot be done. The checkpoint information essentially contains serialized Scala/Java/Python objects and trying to deserialize objects with new, modified classes may lead to errors. Does this mean, new code cannot be deployed over the same checkpoints even if there are not any serialization related changes? (in other words, if the new code does not break previous checkpoint code w.r.t serialization, would new deploys work?) In this case, either start the upgraded app with a different checkpoint directory, or delete the previous checkpoint directory. Assuming this applies to metadata data checkpointing, does it mean that effectively all the computed 'state' is gone? If I am reading from Kafka, does the new code start receiving messages from where it left off? Thanks Vinoth
Re: How Will Spark Execute below Code - Driver and Executors
Join happens on executor. Else spark would not be much of a distributed computing engine :) Reads happen on executor too. Your options are passed to executors and conn objects are created in executors. On 6 Jul 2015 22:58, Ashish Soni asoni.le...@gmail.com wrote: Hi All , If some one can help me understand as which portion of the code gets executed on Driver and which portion will be executed on executor from the below code it would be a great help I have to load data from 10 Tables and then use that data in various manipulation and i am using SPARK SQL for that please let me know if below code will be executed on the driver or it will be executed in each executor node. And if i do a join on the data frame will it happen on executor or driver ? options.put(dbtable, (select * from t_table1) as t_table1); DataFrame t_gsubmember = sqlContext.read().format(jdbc).options(options).load(); t_table1.cache(); options.put(dbtable, (select * from t_table2) as t_table2); DataFrame t_sub = sqlContext.read().format(jdbc).options(options).load(); t_table2.cache(); options.put(dbtable, (select * from t_table3) as t_table3); DataFrame t_pi = sqlContext.read().format(jdbc).options(options).load(); t_table3.cache(); And So on Thanks
Re: DESCRIBE FORMATTED doesn't work in Hive Thrift Server?
What version of Hive and Spark are you using ? Cheers On Sun, Jul 5, 2015 at 10:53 PM, Rex Xiong bycha...@gmail.com wrote: Hi, I try to use for one table created in spark, but it seems the results are all empty, I want to get metadata for table, what's other options? Thanks +---+ |result | +---+ | # col_name| | | | col | | | | # Detailed Table Information | | Database: | | Owner:| | CreateTime: | | LastAccessTime: | | Protect Mode: | | Retention:| | Location: | | Table Type: | | Table Parameters: | | | | | | | | | | | | | | | | | | | | | | | | # Storage Information | | SerDe Library:| | InputFormat: | | OutputFormat: | | Compressed: | | Num Buckets: | | Bucket Columns: | | Sort Columns: | | Storage Desc Params: | | | | | +---+
User Defined Functions - Execution on Clusters
Hi there, I’m trying to get a feel for how User Defined Functions from SparkSQL (as written in Python and registered using the udf function from pyspark.sql.functions) are run behind the scenes. Trying to grok the source it seems that the native Python function is serialized for distribution to the clusters. In practice, it seems to be able to check for other variables and functions defined elsewhere in the namepsace and include those in the function’s serialization. Following all this though, when actually run, are Python interpreter instances on each node brought up to actually run the function against the RDDs, or can the serialized function somehow be run on just the JVM? If bringing up Python instances is the execution model, what is the overhead of PySpark UDFs like as compared to those registered in Scala? Thanks, Alek CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024.
Re: How to recover in case user errors in streaming
1. onBatchError is not a bad idea. 2. It works for all the Kafka Direct API and files as well. They are have batches. However you will not get the number of records for the file stream. 3. Mind giving an example of the exception you would like to see caught? TD On Wed, Jul 1, 2015 at 10:35 AM, Amit Assudani aassud...@impetus.com wrote: Hi TD, Why don’t we have OnBatchError or similar method in StreamingListener ? Also, is StreamingListener only for receiver based approach or does it work for Kafka Direct API / File Based Streaming as well ? Regards, Amit From: Tathagata Das t...@databricks.com Date: Monday, June 29, 2015 at 5:24 PM To: amit assudani aassud...@impetus.com Cc: Cody Koeninger c...@koeninger.org, user@spark.apache.org user@spark.apache.org Subject: Re: How to recover in case user errors in streaming I recommend writing using dstream.foreachRDD, and then rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of dstream.saveAsNewAPIHadoopFiles https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716 On Mon, Jun 29, 2015 at 8:44 AM, Amit Assudani aassud...@impetus.com wrote: Also, how do you suggest catching exceptions while using with connector API like, saveAsNewAPIHadoopFiles ? From: amit assudani aassud...@impetus.com Date: Monday, June 29, 2015 at 9:55 AM To: Tathagata Das t...@databricks.com Cc: Cody Koeninger c...@koeninger.org, user@spark.apache.org user@spark.apache.org Subject: Re: How to recover in case user errors in streaming Thanks TD, this helps. Looking forward to some fix where framework handles the batch failures by some callback methods. This will help not having to write try/catch in every transformation / action. Regards, Amit From: Tathagata Das t...@databricks.com Date: Saturday, June 27, 2015 at 5:14 AM To: amit assudani aassud...@impetus.com Cc: Cody Koeninger c...@koeninger.org, user@spark.apache.org user@spark.apache.org Subject: Re: How to recover in case user errors in streaming I looked at the code and found that batch exceptions are indeed ignored. This is something that is worth fixing, that batch exceptions should not be silently ignored. Also, you can catch failed batch jobs (irrespective of the number of retries) by catch the exception in foreachRDD. Here is an example. dstream.foreachRDD { rdd = try { } catch { } } This will catch failures at the granularity of the job, after all the max retries of a task has been done. But it will be hard to filter and find the push the failed record(s) somewhere. To do that, I would do use rdd.foreach or rdd.foreachPartition, inside which I would catch the exception and push that record out to another Kafka topic, and continue normal processing of other records. This would prevent the task process the partition from failing (as you are catching the bad records). dstream.foreachRDD { rdd = rdd.foreachPartition { iterator = // Create Kafka producer for bad records iterator.foreach { record = try { // process record } catch { case ExpectedException = // publish bad record to error topic in Kafka using above producer } } } } TD PS: Apologies for the Scala examples, hope you get the idea :) On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani aassud...@impetus.com wrote: Also, I get TaskContext.get() null when used in foreach function below ( I get it when I use it in map, but the whole point here is to handle something that is breaking in action ). Please help. :( From: amit assudani aassud...@impetus.com Date: Friday, June 26, 2015 at 11:41 AM To: Cody Koeninger c...@koeninger.org Cc: user@spark.apache.org user@spark.apache.org, Tathagata Das t...@databricks.com Subject: Re: How to recover in case user errors in streaming Hmm, not sure why, but when I run this code, it always keeps on consuming from Kafka and proceeds ignoring the previous failed batches, Also, Now that I get the attempt number from TaskContext and I have information of max retries, I am supposed to handle it in the try/catch block, but does it mean I’ve to handle these kind of exceptions / errors in every transformation step ( map, reduce, transform, etc. ), isn’t there any callback where it says it has been retried max number of times and before being ignored you’ve a handle to do whatever you want to do with the batch / message in hand. Regards, Amit From: Cody Koeninger c...@koeninger.org Date: Friday, June 26, 2015 at 11:32 AM To: amit assudani aassud...@impetus.com Cc: user@spark.apache.org user@spark.apache.org, Tathagata Das t...@databricks.com Subject: Re: How to recover in case user errors in streaming No, if you have a bad message that you are
Spark standalone cluster - Output file stored in temporary directory in worker
I have a Spark standalone cluster with 2 workers - Master and one slave thread run on a single machine -- Machine 1 Another slave running on a separate machine -- Machine 2 I am running a spark shell in the 2nd machine that reads a file from hdfs and does some calculations on them and stores the result in hdfs. This is how I read the file in spark shell - val file = sc.textFile(hdfs://localhost:9000/user/root/table.csv) And this is how I write the result back to a file - finalRDD.saveAsTextFile(hdfs://localhost:9000/user/root/output_file) When I run the code, it runs in the cluster and the job succeeds with each worker processing roughly half of the input file. I am also able to see the records processed in the webUI. But when I check HDFS in the 2nd machine, I find only one part of the output file. The other part is stored in the hdfs in the 1st machine. But even the part is not actually present in the proper hdfs location and is instead stored in a _temporary directory In machine 2 - root@worker:~# hadoop fs -ls ./output_file Found 2 items -rw-r--r-- 3 root supergroup 0 2015-07-06 16:12 output_file/_SUCCESS -rw-r--r-- 3 root supergroup 984337 2015-07-06 16:12 output_file/part-0 In machine 1 - root@spark:~# hadoop fs -ls ./output_file/_temporary/0/task_201507061612_0003_m_01 -rw-r--r-- 3 root supergroup 971824 2015-07-06 16:12 output_file/_temporary/0/ task_201507061612_0003_m_01/part-1 I have a couple of questions - 1. Shouldn't both parts be on the worker 2 ( since the hdfs referred to in the saveAsTextFile is the local hdfs) ? OR will the output be always split in the workers ? 2. Why is the output stored in the _temporary directory in machine 1 ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-Output-file-stored-in-temporary-directory-in-worker-tp23653.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to create empty RDD
This should work val output: RDD[(DetailInputRecord, VISummary)] = sc.paralellize(Seq.empty[(DetailInputRecord, VISummary)]) On Mon, Jul 6, 2015 at 5:11 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: I need to return an empty RDD of type val output: RDD[(DetailInputRecord, VISummary)] This does not work val output: RDD[(DetailInputRecord, VISummary)] = new RDD() as RDD is abstract class. How do i create empty RDD ? -- Deepak
Spark application with a RESTful API
Hi, I've been researching spark for a couple of months now, and I strongly believe it can solve our problem. We are developing an application that allows the user to analyze various sources of information. We are dealing with non-technical users, so simply giving them and interactive shell won't do the trick. To allow the users to execute queries, I have considered writing a Spark application that exposes a RESTful api and runs on our cluster. This application will execute the queries on demand on different threads. We need to serve a few thousand users. I should mention that I've looked into Spark Job-Server too, it looks promising however it's not quite what we are looking for. I wanted to here your input on this solution, and maybe if you can suggest a better one. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-application-with-a-RESTful-API-tp23654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to create empty RDD
I need to return an empty RDD of type val output: RDD[(DetailInputRecord, VISummary)] This does not work val output: RDD[(DetailInputRecord, VISummary)] = new RDD() as RDD is abstract class. How do i create empty RDD ? -- Deepak
How to create a LabeledPoint RDD from a Data Frame
Hi, I have a Dataframe which I want to use for creating a RandomForest model using MLLib. The RandonForest model needs a RDD with LabeledPoints. Wondering how do I convert the DataFrame to LabeledPointRDD Regards, Sourav
Re: 1.4.0 regression: out-of-memory errors on small data
You meant SPARK_REPL_OPTS? I did a quick search. Looks like it has been removed since 1.0. I think it did not affect the behavior of the shell. On Mon, Jul 6, 2015 at 9:04 AM, Simeon Simeonov s...@swoop.com wrote: Yin, that did the trick. I'm curious what was the effect of the environment variable, however, as the behavior of the shell changed from hanging to quitting when the env var value got to 1g. /Sim Simeon Simeonov, Founder CTO, Swoop http://swoop.com/ @simeons http://twitter.com/simeons | blog.simeonov.com | 617.299.6746 From: Yin Huai yh...@databricks.com Date: Monday, July 6, 2015 at 11:41 AM To: Denny Lee denny.g@gmail.com Cc: Simeon Simeonov s...@swoop.com, Andy Huang andy.hu...@servian.com.au, user user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data Hi Sim, I think the right way to set the PermGen Size is through driver extra JVM options, i.e. --conf spark.driver.extraJavaOptions=-XX:MaxPermSize=256m Can you try it? Without this conf, your driver's PermGen size is still 128m. Thanks, Yin On Mon, Jul 6, 2015 at 4:07 AM, Denny Lee denny.g@gmail.com wrote: I went ahead and tested your file and the results from the tests can be seen in the gist: https://gist.github.com/dennyglee/c933b5ae01c57bd01d94. Basically, when running {Java 7, MaxPermSize = 256} or {Java 8, default} the query ran without any issues. I was able to recreate the issue with {Java 7, default}. I included the commands I used to start the spark-shell but basically I just used all defaults (no alteration to driver or executor memory) with the only additional call was with driver-class-path to connect to MySQL Hive metastore. This is on OSX Macbook Pro. One thing I did notice is that your version of Java 7 is version 51 while my version of Java 7 version 79. Could you see if updating to Java 7 version 79 perhaps allows you to use the MaxPermSize call? On Mon, Jul 6, 2015 at 1:36 PM Simeon Simeonov s...@swoop.com wrote: The file is at https://www.dropbox.com/s/a00sd4x65448dl2/apache-spark-failure-data-part-0.gz?dl=1 The command was included in the gist SPARK_REPL_OPTS=-XX:MaxPermSize=256m spark-1.4.0-bin-hadoop2.6/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --driver-memory 4g --executor-memory 4g /Sim Simeon Simeonov, Founder CTO, Swoop http://swoop.com/ @simeons http://twitter.com/simeons | blog.simeonov.com | 617.299.6746 From: Yin Huai yh...@databricks.com Date: Monday, July 6, 2015 at 12:59 AM To: Simeon Simeonov s...@swoop.com Cc: Denny Lee denny.g@gmail.com, Andy Huang andy.hu...@servian.com.au, user user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data I have never seen issue like this. Setting PermGen size to 256m should solve the problem. Can you send me your test file and the command used to launch the spark shell or your application? Thanks, Yin On Sun, Jul 5, 2015 at 9:17 PM, Simeon Simeonov s...@swoop.com wrote: Yin, With 512Mb PermGen, the process still hung and had to be kill -9ed. At 1Gb the spark shell associated processes stopped hanging and started exiting with scala println(dfCount.first.getLong(0)) 15/07/06 00:10:07 INFO storage.MemoryStore: ensureFreeSpace(235040) called with curMem=0, maxMem=2223023063 15/07/06 00:10:07 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 229.5 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.MemoryStore: ensureFreeSpace(20184) called with curMem=235040, maxMem=2223023063 15/07/06 00:10:08 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.7 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:65464 (size: 19.7 KB, free: 2.1 GB) 15/07/06 00:10:08 INFO spark.SparkContext: Created broadcast 2 from first at console:30 java.lang.OutOfMemoryError: PermGen space Stopping spark context. Exception in thread main Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread main 15/07/06 00:10:14 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:65464 in memory (size: 19.7 KB, free: 2.1 GB) That did not change up until 4Gb of PermGen space and 8Gb for driver executor each. I stopped at this point because the exercise started looking silly. It is clear that 1.4.0 is using memory in a substantially different manner. I'd be happy to share the test file so you can reproduce this in your own environment. /Sim Simeon Simeonov, Founder CTO, Swoop http://swoop.com/ @simeons http://twitter.com/simeons | blog.simeonov.com | 617.299.6746 From: Yin Huai yh...@databricks.com Date: Sunday, July 5, 2015 at 11:04 PM To: Denny Lee denny.g@gmail.com Cc: Andy Huang andy.hu...@servian.com.au, Simeon Simeonov s...@swoop.com,
How do we control output part files created by Spark job?
Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to call hiveContext.sql() on all the Hive partitions in parallel?
Hi I have to fire few insert into queries which uses Hive partitions. I have two Hive partitions named server and date. Now I execute insert into queries using hiveContext as shown below query works fine hiveContext.sql(insert into summary1 partition(server='a1',date='2015-05-22') select from sourcetbl bla bla) hiveContext.sql(insert into summary2 partition(server='a1',date='2015-05-22') select from sourcetbl bla bla) I want above queries to be fired across all partitions. Server partition from a1 to a1000 and date will be yesterday's date and this job will run every day on yesterday's date all partitions. I was thinking to have something like this but not sure if it is a good approach. DataFrame partitionFrame = hiveContext.sql(show partitions where date='2015-05-07') partitionFrame.forEach(); // execute above queries inside foreach Will it work in parallel if I use dataframe.foreach and apply quries in all partitions in parallel? Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-call-hiveContext-sql-on-all-the-Hive-partitions-in-parallel-tp23648.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Converting spark JDBCRDD to DataFrame
Use the built in JDBC data source: https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases On Mon, Jul 6, 2015 at 6:42 AM, Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi all! what is the most efficient way to convert jdbcRDD to DataFrame. any example? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Converting-spark-JDBCRDD-to-DataFrame-tp23647.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: writing to kafka using spark streaming
Yeah, creating a new producer at the granularity of partitions may not be that costly. On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org wrote: Use foreachPartition, and allocate whatever the costly resource is once per partition. On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com wrote: I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since different executors in spark are allocated at each run so instantiating a new kafka producer at each run seems a costly operation .Is there a way to reuse objects in processing executors(not in receivers)?
Cluster sizing for recommendations
Hi, I'm having trouble building a recommender and would appreciate a few pointers. I have 350,000,000 events which are stored in roughly 500,000 S3 files and are formatted as semi-structured JSON. These events are not all relevant to making recommendations. My code is (roughly): case class Event(id: String, eventType: String, line: JsonNode) val raw = sc.textFile(s3n://bucket/path/dt=*/*) // Files stored by Hive-style daily partitions val parsed = raw.map(json = { val obj = (new ObjectMapper()).readTree(json); Event(obj.get(_id).asText, obj.get(event).asText, obj); // Parse events into Event objects, keeping parse JSON around for later step }) val downloads = parsed.filter(_.eventType == download) val ratings = downloads.map(event = { // ... extract userid and assetid (product) from JSON - code elided for brevity ... Rating(userId, assetId, 1) }).repartition(2048) ratings.cache val model = ALS.trainImplicit(ratings, 10, 10, 0.1, 0.8) This gets me to a model in around 20-25 minutes, which is actually pretty impressive. But, to get this far in a reasonable time I need to use a fair amount of compute power. I've found I need something like 16 x c3.4xl AWS instances for the workers (16 cores, 30 GB, SSD storage) and an r3.2xl (8 cores, 60 GB, SSD storage) for the master. Oddly, the cached Rating objects only take a bit under 2GB of RAM. I'm developing in a shell at the moment, started like this: spark-shell --master yarn-client --executor-cores 16 --executor-memory 23G --driver-memory 48G --executor-cores: 16 because workers have 16 cores --executor-memory: 23GB because that's about the most I can safely allocate on a 30GB machine --driver-memory: 48GB to make use of the memory on the driver I found that if I didn't put the driver/master on a big box with lots of RAM I had issues calculating the model, even though the ratings are only taking about 2GB of RAM. I'm also setting spark.driver.maxResultSize to 40GB. If I don't repartition, I end up with 500,000 or so partitions (= number of S3 files) and the model doesn't build in any reasonable timescale. Now I've got a model, I'm trying (using 1.4.0-rc1 - I can't upgrade to 1.4.0 yet): val recommendations = model.recommendProductsForUsers(5) recommendations.cache recommendations.first This invariably crashes with various memory errors - typically GC errors, or errors saying that I'm exceeding the spark.akka.frameSize. Increasing this seems to only prolong my agony. I would appreciate any advice you can offer. Whilst I appreciate this requires a fair amount of CPU, it also seems to need an infeasible amount of RAM. To be honest, I probably have far too much because of limitations around how I can size EC2 instances in order to get the CPU I need. But I've been at this for 3 days now and still haven't actually managed to build any recommendations... Thanks in advance, Danny
Re: How do we control output part files created by Spark job?
Try coalesce function to limit no of part files On Mon, Jul 6, 2015 at 1:23 PM kachau umesh.ka...@gmail.com wrote: Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: writing to kafka using spark streaming
whats the difference between foreachPartition vs mapPartitions for a Dtstream both works at partition granularity? One is an operation and another is action but if I call an opeartion afterwords mapPartitions also, which one is more efficient and recommeded? On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das t...@databricks.com wrote: Yeah, creating a new producer at the granularity of partitions may not be that costly. On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org wrote: Use foreachPartition, and allocate whatever the costly resource is once per partition. On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com wrote: I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since different executors in spark are allocated at each run so instantiating a new kafka producer at each run seems a costly operation .Is there a way to reuse objects in processing executors(not in receivers)?
Re: writing to kafka using spark streaming
Both have same efficiency. The primary difference is that one is a transformation (hence is lazy, and requires another action to actually execute), and the other is an action. But it may be a slightly better design in general to have transformations be purely functional (that is, no external side effect) and all non-functional stuff be actions (e.g., saveAsHadoopFile is an action). On Mon, Jul 6, 2015 at 12:09 PM, Shushant Arora shushantaror...@gmail.com wrote: whats the difference between foreachPartition vs mapPartitions for a Dtstream both works at partition granularity? One is an operation and another is action but if I call an opeartion afterwords mapPartitions also, which one is more efficient and recommeded? On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das t...@databricks.com wrote: Yeah, creating a new producer at the granularity of partitions may not be that costly. On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org wrote: Use foreachPartition, and allocate whatever the costly resource is once per partition. On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com wrote: I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since different executors in spark are allocated at each run so instantiating a new kafka producer at each run seems a costly operation .Is there a way to reuse objects in processing executors(not in receivers)?
Master doesn't start, no logs
Hi, I've been compiling spark 1.4.0 with SBT, from the source tarball available on the official website. I cannot run spark's master, even tho I have built and run several other instance of spark on the same machine (spark 1.3, master branch, pre built 1.4, ...) /starting org.apache.spark.deploy.master.Master, logging to /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out failed to launch org.apache.spark.deploy.master.Master: full log in /mnt/spark-1.4.0/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-xx.out/ But the log file is empty. After digging up to ./bin/spark-class, and finally trying to start the master with: ./bin/spark-class org.apache.spark.deploy.master.Master --host 155.99.144.31 I still have the same result. Here is the strace output for this command: http://pastebin.com/bkJVncBm I'm using a 64 bit Xeon, CentOS 6.5, spark 1.4.0, compiled against hadoop 2.5.2 Any idea? :-) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Master-doesn-t-start-no-logs-tp23651.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Are Spark Streaming RDDs always processed in order?
Yes, RDD of batch t+1 will be processed only after RDD of batch t has been processed. Unless there are errors where the batch completely fails to get processed, in which case the point is moot. Just reinforcing the concept further. Additional information: This is true in the default configuration. You may find references to an undocumented hidden configuration called spark.streaming.concurrentJobs elsewhere in the mailing list. Setting that to more than 1 to get more concurrency (between output ops) *breaks* the above guarantee. TD On Sat, Jul 4, 2015 at 6:53 AM, Michal Čizmazia mici...@gmail.com wrote: I had a similar inquiry, copied below. I was also looking into making an SQS Receiver reliable: http://stackoverflow.com/questions/30809975/reliable-sqs-receiver-for-spark-streaming Hope this helps. -- Forwarded message -- From: Tathagata Das t...@databricks.com Date: 20 June 2015 at 17:21 Subject: Re: Serial batching with Spark Streaming To: Michal Čizmazia mici...@gmail.com Cc: Binh Nguyen Van binhn...@gmail.com, user user@spark.apache.org No it does not. By default, only after all the retries etc related to batch X is done, then batch X+1 will be started. Yes, one RDD per batch per DStream. However, the RDD could be a union of multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned DStream). TD On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia mici...@gmail.com wrote: Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 3 July 2015 at 22:12, khaledh khal...@gmail.com wrote: I'm writing a Spark Streaming application that uses RabbitMQ to consume events. One feature of RabbitMQ that I intend to make use of is bulk ack of messages, i.e. no need to ack one-by-one, but only ack the last event in a batch and that would ack the entire batch. Before I commit to doing so, I'd like to know if Spark Streaming always processes RDDs in the same order they arrive in, i.e. if RDD1 arrives before RDD2, is it true that RDD2 will never be scheduled/processed before RDD1 is finished? This is crucial to the ack logic, since if RDD2 can be potentially processed while RDD1 is still being processed, then if I ack the the last event in RDD2 that would also ack all events in RDD1, even though they may have not been completely processed yet. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Are-Spark-Streaming-RDDs-always-processed-in-order-tp23616.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Job consistently failing after leftOuterJoin() - oddly sized / non-uniform partitions
Afternoon all, Really loving this project and the community behind it. Thank you all for your hard work. This past week, though, I've been having a hard time getting my first deployed job to run without failing at the same point every time: Right after a leftOuterJoin, most partitions (600 total) are small (1-100MB), while some others are large (3-6GB). The large ones consistently spill 20-60GB into memory, and eventually fail. If I could only get the partitions to be smaller, right out of the leftOuterJoin, it seems like the job would run fine. I've tried trawling through the logs, but it hasn't been very fruitful in finding out what, specifically, is the issue. Cluster setup: * 6 worker nodes (16 cores, 104GB Memory, 500GB storage) * 1 master (same config as above) Running Spark on YARN, with: storage.memoryFraction = .3 --executors = 6 --executor-cores = 12 --executor-memory = kind of confusing due to YARN, but basically in the Spark monitor site's Executors page, it shows each as running with 18.8GB memory, though I know usage is much larger due to YARN managing various pieces. (Total memory available to yarn shows 480GB, with 270GB currently used). Screenshot of the task page: http://i.imgur.com/xG3KdEl.png Code: https://gist.github.com/momer/8bc03c60a639e5c04eda#file-spark-scala-L60 (see line 60 for the relevant area) Any pointers in the right direction, or advice on articles to read, or even debugging / settings advice or recommendations would be extremely helpful. I'll put a bounty on this of $50 donation to the ASF! :D Thank you all for reading (and hopefully replying!), Mo Omer
Re: Random Forest in MLLib
Not yet, though work on this feature has begun (SPARK-5133 https://issues.apache.org/jira/browse/SPARK-5133) On Mon, Jul 6, 2015 at 4:46 PM, Sourav Mazumder sourav.mazumde...@gmail.com wrote: Hi, Is there a way to get variable importance for RandomForest model created using MLLib ? This way one can know among multiple features which are the one contributing the most to the dependent variable. Regards, Sourav
Re: User Defined Functions - Execution on Clusters
Currently, Python UDFs run in a Python instances, are MUCH slower than Scala ones (from 10 to 100x). There is JIRA to improve the performance: https://issues.apache.org/jira/browse/SPARK-8632, After that, they will be still much slower than Scala ones (because Python is lower and the overhead for calling Python). On Mon, Jul 6, 2015 at 12:55 PM, Eskilson,Aleksander alek.eskil...@cerner.com wrote: Hi there, I’m trying to get a feel for how User Defined Functions from SparkSQL (as written in Python and registered using the udf function from pyspark.sql.functions) are run behind the scenes. Trying to grok the source it seems that the native Python function is serialized for distribution to the clusters. In practice, it seems to be able to check for other variables and functions defined elsewhere in the namepsace and include those in the function’s serialization. Following all this though, when actually run, are Python interpreter instances on each node brought up to actually run the function against the RDDs, or can the serialized function somehow be run on just the JVM? If bringing up Python instances is the execution model, what is the overhead of PySpark UDFs like as compared to those registered in Scala? Thanks, Alek CONFIDENTIALITY NOTICE This message and any included attachments are from Cerner Corporation and are intended only for the addressee. The information contained in this message is confidential and may constitute inside or non-public information under international, federal, or state securities laws. Unauthorized forwarding, printing, copying, distribution, or use of such information is strictly prohibited and may be unlawful. If you are not the addressee, please promptly delete this message and notify the sender of the delivery error by e-mail or you may call Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1) (816)221-1024. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How does executor cores change the spark job behavior ?
I have a simple job , that reads data = union = filter = map and the count 1 Job started 2402 tasks read 149G of input. I started the job with different number of executors 1) 1 -- 8.3 mins 2) 2 -- 5.6 mins 3) 3 -- 3.1 mins 1) Why is increasing the cores speading up this app ? 2) I started the job with --num-executors 9973 but when i click executors tab i see 330 executors. So can i start the job with --num-executors 330 as i get only that from YARN cluster ? 3) I had set the split size to 64 MB but when i start the job with --executor-memory 14g , how do i decide how much memory i need ? also as the cores are increasing how do i get that into the calculations ? 4) as the speed is getting better how far can i go with increasing executors ? -- Deepak
Random Forest in MLLib
Hi, Is there a way to get variable importance for RandomForest model created using MLLib ? This way one can know among multiple features which are the one contributing the most to the dependent variable. Regards, Sourav
JVM is not ready after 10 seconds
Hi, I am trying to connect a worker to the master. The spark master is on cloudera manager and I know the master IP address and port number. I downloaded the spark binary for CDH4 on the worker machine and then when I try to invoke the command sc = sparkR.init(master=ip address:port number) I get the following error. sc=sparkR.init(master=spark://10.229.200.250:7377) Launching java with spark-submit command C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85 Error in sparkR.init(master = spark://10.229.200.250:7377) : JVM is not ready after 10 seconds In addition: Warning message: running command 'C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85' had status 127 I am using windows 7 as the OS on the worker machine and I am invoking the sparkR.init() from RStudio Any help in this reference will be appreciated Thank you, Ashish Dutt -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-is-not-ready-after-10-seconds-tp23658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Job consistently failing after leftOuterJoin() - oddly sized / non-uniform partitions
You can bump up number of partition by a parameter in join operator. However you have a data skew problem which you need to resolve using a reasonable partition by function On 7 Jul 2015 08:57, Mohammed Omer beancinemat...@gmail.com wrote: Afternoon all, Really loving this project and the community behind it. Thank you all for your hard work. This past week, though, I've been having a hard time getting my first deployed job to run without failing at the same point every time: Right after a leftOuterJoin, most partitions (600 total) are small (1-100MB), while some others are large (3-6GB). The large ones consistently spill 20-60GB into memory, and eventually fail. If I could only get the partitions to be smaller, right out of the leftOuterJoin, it seems like the job would run fine. I've tried trawling through the logs, but it hasn't been very fruitful in finding out what, specifically, is the issue. Cluster setup: * 6 worker nodes (16 cores, 104GB Memory, 500GB storage) * 1 master (same config as above) Running Spark on YARN, with: storage.memoryFraction = .3 --executors = 6 --executor-cores = 12 --executor-memory = kind of confusing due to YARN, but basically in the Spark monitor site's Executors page, it shows each as running with 18.8GB memory, though I know usage is much larger due to YARN managing various pieces. (Total memory available to yarn shows 480GB, with 270GB currently used). Screenshot of the task page: http://i.imgur.com/xG3KdEl.png Code: https://gist.github.com/momer/8bc03c60a639e5c04eda#file-spark-scala-L60 (see line 60 for the relevant area) Any pointers in the right direction, or advice on articles to read, or even debugging / settings advice or recommendations would be extremely helpful. I'll put a bounty on this of $50 donation to the ASF! :D Thank you all for reading (and hopefully replying!), Mo Omer
RE: How to create a LabeledPoint RDD from a Data Frame
Have you looked at the new Spark ML library? You can use a DataFrame directly with the Spark ML API. https://spark.apache.org/docs/latest/ml-guide.html Mohammed From: Sourav Mazumder [mailto:sourav.mazumde...@gmail.com] Sent: Monday, July 6, 2015 10:29 AM To: user Subject: How to create a LabeledPoint RDD from a Data Frame Hi, I have a Dataframe which I want to use for creating a RandomForest model using MLLib. The RandonForest model needs a RDD with LabeledPoints. Wondering how do I convert the DataFrame to LabeledPointRDD Regards, Sourav
RE: How do we control output part files created by Spark job?
You could repartition the dataframe before saving it. However, that would impact the parallelism of the next jobs that reads these file from HDFS. Mohammed -Original Message- From: kachau [mailto:umesh.ka...@gmail.com] Sent: Monday, July 6, 2015 10:23 AM To: user@spark.apache.org Subject: How do we control output part files created by Spark job? Hi I am having couple of Spark jobs which processes thousands of files every day. File size may very from MBs to GBs. After finishing job I usually save using the following code finalJavaRDD.saveAsParquetFile(/path/in/hdfs); OR dataFrame.write.format(orc).save(/path/in/hdfs) //storing as ORC file as of Spark 1.4 Spark job creates plenty of small part files in final output directory. As far as I understand Spark creates part file for each partition/task please correct me if I am wrong. How do we control amount of part files Spark creates? Finally I would like to create Hive table using these parquet/orc directory and I heard Hive is slow when we have large no of small files. Please guide I am new to Spark. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
回复:Re: How to shut down spark web UI?
got it ,thanks. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Shixiong Zhu zsxw...@gmail.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: How to shut down spark web UI? 日期:2015年07月06日 17点31分 You can set spark.ui.enabled to false to disable the Web UI. Best Regards,Shixiong Zhu 2015-07-06 17:05 GMT+08:00 luohui20...@sina.com: Hello there, I heard that there is some way to shutdown Spark WEB UI, is there a configuration to support this? Thank you. Thanksamp;Best regards! San.Luo
Re: Spark standalone cluster - Output file stored in temporary directory in worker
Can you share your hadoop configuration file please? - etc/hadoop/core-site.xml - etc/hadoop/hdfs-site.xml - etc/hadoop/hadoo-env.sh AFAIK, the following properties should be configured: hadoop.tmp.dir, dfs.namenode.name.dir, dfs.datanode.data.dir and dfs.namenode.checkpoint.dir Otherwise, an HDFS slave will use it's default temporary folder to save blocks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-Output-file-stored-in-temporary-directory-in-worker-tp23653p23656.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JVM is not ready after 10 seconds.
Hi, I am trying to connect a worker to the master. The spark master is on cloudera manager and I know the master IP address and port number. I downloaded the spark binary for CDH4 on the worker machine and then when I try to invoke the command sc = sparkR.init(master=ip address:port number) I get the following error. sc=sparkR.init(master=spark://10.229.200.250:7377) Launching java with spark-submit command C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85 Error in sparkR.init(master = spark://10.229.200.250:7377) : JVM is not ready after 10 seconds In addition: Warning message: running command 'C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85' had status 127 I am using windows 7 as the OS on the worker machine and I am invoking the sparkR.init() from RStudio Any help in this reference will be appreciated Thank you, Ashish Dutt
Re: JVM is not ready after 10 seconds
Hello Shivaram, Thank you for your response. Being a novice at this stage can you also tell how to configure or set the execute permission for the spark-submit file? Thank you for your time. Sincerely, Ashish Dutt On Tue, Jul 7, 2015 at 9:21 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: When I've seen this error before it has been due to the spark-submit file (i.e. `C:\spark-1.4.0\bin/bin/spark-submit.cmd`) not having execute permissions. You can try to set execute permission and see if it fixes things. Also we have a PR open to fix a related problem at https://github.com/apache/spark/pull/7025. If you can test the PR that will also be very helpful Thanks Shivaram On Mon, Jul 6, 2015 at 6:11 PM, ashishdutt ashish.du...@gmail.com wrote: Hi, I am trying to connect a worker to the master. The spark master is on cloudera manager and I know the master IP address and port number. I downloaded the spark binary for CDH4 on the worker machine and then when I try to invoke the command sc = sparkR.init(master=ip address:port number) I get the following error. sc=sparkR.init(master=spark://10.229.200.250:7377) Launching java with spark-submit command C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85 Error in sparkR.init(master = spark://10.229.200.250:7377) : JVM is not ready after 10 seconds In addition: Warning message: running command 'C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85' had status 127 I am using windows 7 as the OS on the worker machine and I am invoking the sparkR.init() from RStudio Any help in this reference will be appreciated Thank you, Ashish Dutt -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-is-not-ready-after-10-seconds-tp23658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Are Spark Streaming RDDs always processed in order?
Great! That's what I gathered from the thread titled Serial batching with Spark Streaming, but thanks for confirming this again. On 6 July 2015 at 15:31, Tathagata Das t...@databricks.com wrote: Yes, RDD of batch t+1 will be processed only after RDD of batch t has been processed. Unless there are errors where the batch completely fails to get processed, in which case the point is moot. Just reinforcing the concept further. Additional information: This is true in the default configuration. You may find references to an undocumented hidden configuration called spark.streaming.concurrentJobs elsewhere in the mailing list. Setting that to more than 1 to get more concurrency (between output ops) *breaks* the above guarantee. TD On Sat, Jul 4, 2015 at 6:53 AM, Michal Čizmazia mici...@gmail.com wrote: I had a similar inquiry, copied below. I was also looking into making an SQS Receiver reliable: http://stackoverflow.com/questions/30809975/reliable-sqs-receiver-for-spark-streaming Hope this helps. -- Forwarded message -- From: Tathagata Das t...@databricks.com Date: 20 June 2015 at 17:21 Subject: Re: Serial batching with Spark Streaming To: Michal Čizmazia mici...@gmail.com Cc: Binh Nguyen Van binhn...@gmail.com, user user@spark.apache.org No it does not. By default, only after all the retries etc related to batch X is done, then batch X+1 will be started. Yes, one RDD per batch per DStream. However, the RDD could be a union of multiple RDDs (e.g. RDDs generated by windowed DStream, or unioned DStream). TD On Fri, Jun 19, 2015 at 3:16 PM, Michal Čizmazia mici...@gmail.com wrote: Thanks Tathagata! I will use *foreachRDD*/*foreachPartition*() instead of *trasform*() then. Does the default scheduler initiate the execution of the *batch X+1* after the *batch X* even if tasks for the* batch X *need to be *retried due to failures*? If not, please could you suggest workarounds and point me to the code? One more thing was not 100% clear to me from the documentation: Is there exactly *1 RDD* published *per a batch interval* in a DStream? On 3 July 2015 at 22:12, khaledh khal...@gmail.com wrote: I'm writing a Spark Streaming application that uses RabbitMQ to consume events. One feature of RabbitMQ that I intend to make use of is bulk ack of messages, i.e. no need to ack one-by-one, but only ack the last event in a batch and that would ack the entire batch. Before I commit to doing so, I'd like to know if Spark Streaming always processes RDDs in the same order they arrive in, i.e. if RDD1 arrives before RDD2, is it true that RDD2 will never be scheduled/processed before RDD1 is finished? This is crucial to the ack logic, since if RDD2 can be potentially processed while RDD1 is still being processed, then if I ack the the last event in RDD2 that would also ack all events in RDD1, even though they may have not been completely processed yet. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Are-Spark-Streaming-RDDs-always-processed-in-order-tp23616.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why Kryo Serializer is slower than Java Serializer in TeraSort
Hi. Just a few quick comment on your question. If you drill into (click the link of the subtasks) you can get more detailed view of the tasks. One of the things reported is the time for serialization. If that is your dominant factor it should be reflected there, right? Are you sure the input data is not getting cached between runs (i.e. does the order of the experiments matter and did you explicitly flush the operation system memory between runs etc. etc.)? If you now run the old experiment again, does it take the same amount of time again? Did you validate that the results where actually correct? Hope this helps.. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-Kryo-Serializer-is-slower-than-Java-Serializer-in-TeraSort-tp23621p23659.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Please add the Chicago Spark Users' Group to the community page
Here's our home page: http://www.meetup.com/Chicago-Spark-Users/ Thanks, Dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
RE: Spark SQL queries hive table, real time ?
Hi Florian, It depends on a number of factors. How much data are you querying? Where is the data stored (HDD, SSD or DRAM)? What is the file format (Parquet or CSV)? In theory, it is possible to use Spark SQL for real-time queries, but cost increases as the data size grows. If you can store all of your data in memory, then you should be able to query it in real-time ☺ On the other extreme, if Spark SQL has to read a terabyte of data from spinning disk, there is no way it can respond in real-time. To be fair, no software can read a terabyte of data from HDD in real-time. Simple laws of physics. Either you will have to spread out the reads over a large number of disks and read them in parallel. Alternatively, index the data so that your queries don’t have to read a terabyte of data from disk. Hope that helps. Mohammed From: Denny Lee [mailto:denny.g@gmail.com] Sent: Monday, July 6, 2015 4:21 AM To: spierki; user@spark.apache.org Subject: Re: Spark SQL queries hive table, real time ? Within the context of your question, Spark SQL utilizing the Hive context is primarily about very fast queries. If you want to use real-time queries, I would utilize Spark Streaming. A couple of great resources on this topic include Guest Lecture on Spark Streaming in Stanford CME 323: Distributed Algorithms and Optimizationhttp://www.slideshare.net/tathadas/guest-lecture-on-spark-streaming-in-standford and Recipes for Running Spark Streaming Applications in Productionhttps://spark-summit.org/2015/events/recipes-for-running-spark-streaming-applications-in-production/ (from the recent Spark Summit 2015) HTH! On Mon, Jul 6, 2015 at 3:23 PM spierki florian.spierc...@crisalid.commailto:florian.spierc...@crisalid.com wrote: Hello, I'm actually asking my self about performance of using Spark SQL with Hive to do real time analytics. I know that Hive has been created for batch processing, and Spark is use to do fast queries. But, use Spark SQL with Hive will allow me to do real time queries ? Or it just will make fastest queries but not real time. Should I use an other datawarehouse, like Hbase ? Thanks in advance for your time and consideration, Florian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-queries-hive-table-real-time-tp23642.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
How to debug java.io.OptionalDataException issues
Hi folks, suffering from a pretty strange issue: Is there a way to tell what object is being successfully serialized/deserialized? I have a maven-installed jar that works well when fat jarred within another, but shows the following stack when marked as provided and copied to the runtime classpath...I'm pretty puzzled but can't find any good way to debug what is causing unhappiness? 15/07/07 00:24:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, osd04.shaka.rum.tn.akamai.com): java.io.OptionalDataException at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1370) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Spark Unit tests - RDDBlockId not found
I am running unit tests on Spark 1.3.1 with sbt test and besides the unit tests being incredibly slow I keep running into java.lang.ClassNotFoundException: org.apache.spark.storage.RDDBlockId issues. Usually this means a dependency issue, but I wouldn't know from where... Any help is greatly appreciated My build.sbt: libraryDependencies ++= Seq( org.scalaz %% scalaz-core % 7.1.2 excludeAll ExclusionRule(organization = org.slf4j), com.typesafe.play %% play-json % 2.3.4 excludeAll ExclusionRule(organization = org.slf4j), org.apache.spark%% spark-core % 1.3.1 % provided withSources() excludeAll (ExclusionRule(organization = org.slf4j), ExclusionRule(org.spark-project.akka, akka-actor_2.10)), org.apache.spark%% spark-graphx % 1.3.1 % provided withSources() excludeAll (ExclusionRule(organization = org.slf4j), ExclusionRule(org.spark-project.akka, akka-actor_2.10)), org.apache.cassandra% cassandra-all % 2.1.6, org.apache.cassandra% cassandra-thrift % 2.1.6, com.typesafe.akka %% akka-actor % 2.3.11, com.datastax.cassandra % cassandra-driver-core % 2.1.6 withSources() withJavadoc() excludeAll (ExclusionRule(organization = org.slf4j),ExclusionRule(organization = org.apache.spark),ExclusionRule(organization = com.twitter,name = parquet-hadoop-bundle)), com.github.nscala-time %% nscala-time % 1.2.0 excludeAll ExclusionRule(organization = org.slf4j) withSources(), com.datastax.spark %% spark-cassandra-connector-embedded % 1.3.0-M2 excludeAll (ExclusionRule(organization = org.slf4j),ExclusionRule(organization = org.apache.spark),ExclusionRule(organization = com.twitter,name = parquet-hadoop-bundle)), com.datastax.spark %% spark-cassandra-connector % 1.3.0-M2 excludeAll (ExclusionRule(organization = org.slf4j),ExclusionRule(organization = org.apache.spark),ExclusionRule(organization = com.twitter,name = parquet-hadoop-bundle)), org.slf4j % slf4j-api% 1.6.1, com.twitter% jsr166e % 1.1.0, org.slf4j % slf4j-nop % 1.6.1 % test, org.scalatest %% scalatest % 2.2.1 % test excludeAll ExclusionRule(organization = org.slf4j) ) and my spark test settings (spark.kryo.registrator,com.my.spark.MyRegistrator) (spark.eventLog.dir,) (spark.driver.memory,16G) (spark.kryoserializer.buffer.mb,512) (spark.akka.frameSize,5) (spark.shuffle.spill,false) (spark.default.parallelism,8) (spark.shuffle.consolidateFiles,false) (spark.serializer,org.apache.spark.serializer.KryoSerializer) (spark.shuffle.spill.compress,false) (spark.driver.host,10.10.68.66) (spark.akka.timeout,300) (spark.driver.port,55328) (spark.eventLog.enabled,false) (spark.cassandra.connection.host,127.0.0.1) (spark.cassandra.connection.ssl.enabled,false) (spark.master,local[8]) (spark.cassandra.connection.ssl.trustStore.password,password) (spark.fileserver.uri,http://10.10.68.66:55329) (spark.cassandra.auth.username,username) (spark.local.dir,/tmp/spark) (spark.app.id,local-1436229075894) (spark.storage.blockManagerHeartBeatMs,30) (spark.executor.id,driver) (spark.cassandra.auth.password,) (spark.storage.memoryFraction,0.5) (spark.speculation,false) (spark.tachyonStore.folderName,spark-8c33e537-3279-4059-8e4d-6902329bb4ca) (spark.app.name,Count all entries 217885402) (spark.shuffle.compress,false) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Unit-tests-RDDBlockId-not-found-tp23657.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JVM is not ready after 10 seconds
When I've seen this error before it has been due to the spark-submit file (i.e. `C:\spark-1.4.0\bin/bin/spark-submit.cmd`) not having execute permissions. You can try to set execute permission and see if it fixes things. Also we have a PR open to fix a related problem at https://github.com/apache/spark/pull/7025. If you can test the PR that will also be very helpful Thanks Shivaram On Mon, Jul 6, 2015 at 6:11 PM, ashishdutt ashish.du...@gmail.com wrote: Hi, I am trying to connect a worker to the master. The spark master is on cloudera manager and I know the master IP address and port number. I downloaded the spark binary for CDH4 on the worker machine and then when I try to invoke the command sc = sparkR.init(master=ip address:port number) I get the following error. sc=sparkR.init(master=spark://10.229.200.250:7377) Launching java with spark-submit command C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85 Error in sparkR.init(master = spark://10.229.200.250:7377) : JVM is not ready after 10 seconds In addition: Warning message: running command 'C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85' had status 127 I am using windows 7 as the OS on the worker machine and I am invoking the sparkR.init() from RStudio Any help in this reference will be appreciated Thank you, Ashish Dutt -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-is-not-ready-after-10-seconds-tp23658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Spark application with a RESTful API
It is not a bad idea. Many people use this approach. Mohammed -Original Message- From: Sagi r [mailto:stsa...@gmail.com] Sent: Monday, July 6, 2015 1:58 PM To: user@spark.apache.org Subject: Spark application with a RESTful API Hi, I've been researching spark for a couple of months now, and I strongly believe it can solve our problem. We are developing an application that allows the user to analyze various sources of information. We are dealing with non-technical users, so simply giving them and interactive shell won't do the trick. To allow the users to execute queries, I have considered writing a Spark application that exposes a RESTful api and runs on our cluster. This application will execute the queries on demand on different threads. We need to serve a few thousand users. I should mention that I've looked into Spark Job-Server too, it looks promising however it's not quite what we are looking for. I wanted to here your input on this solution, and maybe if you can suggest a better one. Thank you. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-application-with-a-RESTful-API-tp23654.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JVM is not ready after 10 seconds
Hi, These are the settings for my spark-conf file on the worker machine from where I am trying to access the spark server. I think I need to first configure the spark-submit file too but I do not know how,, Can somebody suggest me ? # Default system properties included when running spark-submit. # This is useful for setting default environmental settings. # Example: spark.master 10.229.200.250:7337 # spark.eventLog.enabled true spark.eventLog.dir hdfs://namenode:8021/directory # spark.serializer org.apache.spark.serializer.KryoSerializer # spark.driver.memory 5g # spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers=one two three Sincerely, Ashish Dutt On Tue, Jul 7, 2015 at 9:30 AM, Ashish Dutt ashish.du...@gmail.com wrote: Hello Shivaram, Thank you for your response. Being a novice at this stage can you also tell how to configure or set the execute permission for the spark-submit file? Thank you for your time. Sincerely, Ashish Dutt On Tue, Jul 7, 2015 at 9:21 AM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: When I've seen this error before it has been due to the spark-submit file (i.e. `C:\spark-1.4.0\bin/bin/spark-submit.cmd`) not having execute permissions. You can try to set execute permission and see if it fixes things. Also we have a PR open to fix a related problem at https://github.com/apache/spark/pull/7025. If you can test the PR that will also be very helpful Thanks Shivaram On Mon, Jul 6, 2015 at 6:11 PM, ashishdutt ashish.du...@gmail.com wrote: Hi, I am trying to connect a worker to the master. The spark master is on cloudera manager and I know the master IP address and port number. I downloaded the spark binary for CDH4 on the worker machine and then when I try to invoke the command sc = sparkR.init(master=ip address:port number) I get the following error. sc=sparkR.init(master=spark://10.229.200.250:7377) Launching java with spark-submit command C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85 Error in sparkR.init(master = spark://10.229.200.250:7377) : JVM is not ready after 10 seconds In addition: Warning message: running command 'C:\spark-1.4.0\bin/bin/spark-submit.cmd sparkr-shell C:\Users\ASHISH~1\AppData\Local\Temp\Rtmp82kCxH\backend_port4281739d85' had status 127 I am using windows 7 as the OS on the worker machine and I am invoking the sparkR.init() from RStudio Any help in this reference will be appreciated Thank you, Ashish Dutt -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JVM-is-not-ready-after-10-seconds-tp23658.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do we control output part files created by Spark job?
Hi. Have you tried to repartition the finalRDD before saving? This link might help. http://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_the_rdd_to_files.html Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-do-we-control-output-part-files-created-by-Spark-job-tp23649p23660.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to black list nodes on the cluster
Hi. Have you tried to enable speculative execution? This will allow Spark to run the same sub-task of the job on other available slots when slow tasks are encountered. This can be passed at execution time with the params are: spark.speculation spark.speculation.interval spark.speculation.multiplier spark.speculation.quantile See https://spark.apache.org/docs/latest/configuration.html under Scheduling. Regards, Gylfi. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-black-list-nodes-on-the-cluster-tp23650p23661.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
The auxService:spark_shuffle does not exist
I am getting following error for simple spark job I am running following command /spark-submit --class org.apache.spark.examples.SparkPi --deploy-mode cluster --master yarn /opt/cloudera/parcels/CDH/lib/spark/lib/spark-examples-1.2.0-cdh5.3.1-hadoop2.5.0-cdh5.3.1.jar/ but job doesn't show any progress and just showing following on cmd 15/07/06 22:18:41 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:42 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:43 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:44 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:45 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:46 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:47 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:48 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:49 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) 15/07/06 22:18:50 INFO Client: Application report for application_1436234808473_0017 (state: RUNNING) Then I had to kill this job, and look into logs found following Exception in thread ContainerLauncher #4 java.lang.Error: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:110) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:65) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Exception in thread ContainerLauncher #5 java.lang.Error: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:168) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) at org.apache.hadoop.yarn.client.api.impl.NMClientImpl.startContainer(NMClientImpl.java:206) at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:110) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:65) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more dont know why this is happening. Any one know whats wrong here. This started happening after cloudera manager upgrade from CM 5.3.1 to CM 5.4.3. We are on CDH 5.3.1 Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-auxService-spark-shuffle-does-not-exist-tp23662.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To
Re: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException in spark with mysql database
Try including alias in the query. val query=(select * from +table+) a On Mon, Jul 6, 2015 at 3:38 AM Hafiz Mujadid hafizmujadi...@gmail.com wrote: Hi! I am trying to load data from my sql database using following code val query=select * from +table+ val url = jdbc:mysql:// + dataBaseHost + : + dataBasePort + / + dataBaseName + ?user= + db_user + password= + db_pass val sc = new SparkContext(new SparkConf().setAppName(SparkJdbcDs).setMaster(local[*])) val sqlContext = new SQLContext(sc) val options = new HashMap[String, String]() options.put(driver, com.mysql.jdbc.Driver) options.put(url, url) options.put(dbtable, query) options.put(numPartitions, 1) sqlContext.load(jdbc, options) And I get following exception Exception in thread main com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'select * from tempTable WHERE 1=0' -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/com-mysql-jdbc-exceptions-jdbc4-MySQLSyntaxErrorException-in-spark-with-mysql-database-tp23643.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Please add the Chicago Spark Users' Group to the community page
Hey Dean, Sure, will take care of this. HTH, Denny On Tue, Jul 7, 2015 at 10:07 Dean Wampler deanwamp...@gmail.com wrote: Here's our home page: http://www.meetup.com/Chicago-Spark-Users/ Thanks, Dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com
Re: How to recover in case user errors in streaming
Hi Cody and TD, Just trying to understanding this under the hook, but cannot find any place for this specific logic: once you reach max failures the whole stream will stop. If possible, could you point me to the right direction ? For my understanding, the exception throw from the job would not be catch but would be cascading to the upper thread which create the StreamingContext and would end the driver process. That's one failing job(after retry 4 times) would end the whole stream ? private class JobHandler(job: Job) extends Runnable with Logging { def run() { ... try { eventLoop.post(JobStarted(job)) PairRDDFunctions.disableOutputSpecValidation.withValue(true) { job.run() } eventLoop.post(JobCompleted(job)) } finally { ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null) ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null) } } } } Thanks, Zhichao On Fri, Jun 26, 2015 at 11:16 PM, Cody Koeninger c...@koeninger.org wrote: If you're consistently throwing exceptions and thus failing tasks, once you reach max failures the whole stream will stop. It's up to you to either catch those exceptions, or restart your stream appropriately once it stops. Keep in mind that if you're relying on checkpoints, and fixing the error requires changing your code, you may not be able to recover the checkpoint. On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani aassud...@impetus.com wrote: *Problem: *how do we recover from user errors (connectivity issues / storage service down / etc.)? *Environment:* Spark streaming using Kafka Direct Streams *Code Snippet: * HashSetString topicsSet = *new* HashSetString(Arrays.*asList*( kafkaTopic1)); HashMapString, String kafkaParams = *new* HashMapString, String(); kafkaParams.put(metadata.broker.list, localhost:9092); kafkaParams.put(auto.offset.reset, smallest); JavaPairInputDStreamString, String messages = KafkaUtils .*createDirectStream*(jssc, String.*class*, String.*class*, StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet); JavaDStreamString inputStream = messages .map(*new* *FunctionTuple2String, String, String()* { @Override *public* String call(Tuple2String, String tuple2) { *return* tuple2._2(); }}); inputStream.foreachRDD(*new* *FunctionJavaRDDString, Void()* { @Override *public* Void call(JavaRDDString rdd) *throws* Exception { *if*(!rdd.isEmpty()) { rdd.foreach(*new* *VoidFunctionString()*{ @Override *public* *void* call(String arg0) *throws* Exception { System.*out*.println(rdd--+arg0); Thread.*sleep*(1000); *throw* *new* Exception( :::user and/or service exception::+arg0); }}); } *return* *null*; } }); *Detailed Description*: Using spark streaming I read the text messages from kafka using direct API. For sake of simplicity, all I do in processing is printing each message on console and sleep of 1 sec. as a placeholder for actual processing. Assuming we get a user error may be due to bad record, format error or the service connectivity issues or let’s say the persistent store downtime. I’ve represented that with throwing an Exception from foreach block. I understand spark retries this configurable number of times and proceeds ahead. The question is what happens to those failed messages, does ( if yes when ) spark re-tries those ? If not, does it have any callback method so as user can log / dump it in error queue and provision it for further analysis and / or retrials manually. Also, fyi, checkpoints are enabled and above code is in create context method to recover from spark driver / worker failures. -- NOTE: This message may contain information that is confidential, proprietary, privileged or otherwise protected by law. The message is intended solely for the named addressee. If received in error, please destroy and notify the sender. Any use of this email is prohibited when received in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this communication has been maintained nor that the communication is free of errors, virus, interception or interference. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: writing to kafka using spark streaming
On using foreachPartition jobs get created are not displayed on driver console but are visible on web ui. On driver it creates some stage statistics of form [Stage 2: (0 + 2) / 5] and disappeared . I am using foreachPartition as : kafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { public Void call(JavaPairRDDbyte[], byte[] v1) throws Exception { v1.foreachPartition(new VoidFunctionIteratorTuple2byte[],byte[]() { public void call(IteratorTuple2byte[], byte[] t) throws Exception { SparkKafkaProducer producer = SparkKafkaProducer.getInstance(); while(t.hasNext()){ Tuple2byte[], byte[] tuple = t.next(); //create msg after processing tuple._2() producer.sendMsg(msg); } } }); return null; } }); 1.Why jobs are not displayed on driver console? Is call function in above code snippet being executed on each workers for each partition? And on webui also no job get 2.displayed when input source (kafka queue) does not have any new messages? But when I used mapPartitions jobs get created and displayed on webui as well as driver for each batch whether input has data or not ? Is it expected behaviour foreachPartition - that it ignores empty partition or it does not even created partitions when input source was empty. On Tue, Jul 7, 2015 at 12:44 AM, Tathagata Das t...@databricks.com wrote: Both have same efficiency. The primary difference is that one is a transformation (hence is lazy, and requires another action to actually execute), and the other is an action. But it may be a slightly better design in general to have transformations be purely functional (that is, no external side effect) and all non-functional stuff be actions (e.g., saveAsHadoopFile is an action). On Mon, Jul 6, 2015 at 12:09 PM, Shushant Arora shushantaror...@gmail.com wrote: whats the difference between foreachPartition vs mapPartitions for a Dtstream both works at partition granularity? One is an operation and another is action but if I call an opeartion afterwords mapPartitions also, which one is more efficient and recommeded? On Tue, Jul 7, 2015 at 12:21 AM, Tathagata Das t...@databricks.com wrote: Yeah, creating a new producer at the granularity of partitions may not be that costly. On Mon, Jul 6, 2015 at 6:40 AM, Cody Koeninger c...@koeninger.org wrote: Use foreachPartition, and allocate whatever the costly resource is once per partition. On Mon, Jul 6, 2015 at 6:11 AM, Shushant Arora shushantaror...@gmail.com wrote: I have a requirement to write in kafka queue from a spark streaming application. I am using spark 1.2 streaming. Since different executors in spark are allocated at each run so instantiating a new kafka producer at each run seems a costly operation .Is there a way to reuse objects in processing executors(not in receivers)?
Re: How to create empty RDD
I userd val output: RDD[(DetailInputRecord, VISummary)] = sc.emptyRDD[(DetailInputRecord, VISummary)] to create empty RDD before. Give it a try, it might work for you too. 2015-07-06 14:11 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: I need to return an empty RDD of type val output: RDD[(DetailInputRecord, VISummary)] This does not work val output: RDD[(DetailInputRecord, VISummary)] = new RDD() as RDD is abstract class. How do i create empty RDD ? -- Deepak
Unable to start spark-sql
Hi Sparkers, I am unable to start spark-sql service please check the error as mentioned below. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:497) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:475) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:523) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:397) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:356) at
Re: Spark custom streaming receiver not storing data reliably?
Jorn, Thanks for your response. I am pasting below a snippet of code which shows drools integration when facts/events are picked up after reading through a File (FileReader-readLine()), it works as expected and I have tested it for wide range of record data in a File. Same code doesn't work when I try to do same thing on a streaming incoming data generated out of same File. I have used several batch durations, from 1 to 50 seconds. Every execution shows that rules did not fire on some valid facts/events. I also thought of it being an issue with multi-threading; but that is not the case as well. I have verified. Hope this provides with with all the relevant information. Regards, Ajit /* * Copyright (c) 2015. Capiot Software India Pvt Ltd. * Author: a...@capiot.com */ package com.capiot.analytics.spark.file; import com.capiot.analytics.spark.Person; import com.capiot.analytics.spark.util.KnowledgeBaseHelperUtil; import com.capiot.analytics.spark.util.TrackingAgendaEventListener; import org.apache.spark.api.java.function.VoidFunction; import org.drools.runtime.StatefulKnowledgeSession; import java.io.BufferedWriter; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class RuleExceutionFunction implements VoidFunction Person { static StatefulKnowledgeSession knowledgeSession; static ListPerson customersWithOffers = Collections.synchronizedList(new ArrayList()); //static MapInteger, String map = Collections.synchronizedMap(new HashMap()); static TrackingAgendaEventListener agendaEventListener = new TrackingAgendaEventListener(); static AtomicInteger count = new AtomicInteger(0); //private static final File f = new File(C:\\Users\\bajit\\Documents\\customerOffers_5k_file.csv); private static PrintWriter pw = null; private static PrintWriter pwp = null; private static final long serialVersionUID = 2370; public RuleExceutionFunction() throws Exception { if (knowledgeSession == null) { knowledgeSession = KnowledgeBaseHelperUtil.getStatefulKnowledgeSession(offers.drl); knowledgeSession.addEventListener(agendaEventListener); { pw = new PrintWriter(new BufferedWriter(new java.io.FileWriter (C:\\Users\\bajit\\Documents\\customerOffers_file_5k.csv) ), true); pwp = new PrintWriter(new BufferedWriter(new java.io.FileWriter (C:\\Users\\bajit\\Documents\\processed_customers_file_5k + .csv) ), true); } } } @Override public void call(Person person) throws Exception { //ListPerson facts = rdd.collect(); //Apply rules on facts here //synchronized (this) { knowledgeSession.insert(person); int i = knowledgeSession.fireAllRules(); } //System.out.println(++ '+ agendaEventListener.activationsToString()); if (person.hasOffer()) { customersWithOffers.add(person); //Files.append(person.toString() + System.getProperty(line.separator), f, Charset.defaultCharset()); pw.println(person.toString()); } pwp.println(person.toString()); count.getAndIncrement(); } public StatefulKnowledgeSession getSession() { return knowledgeSession; } public ListPerson getCustomersWithOffers() { return customersWithOffers; } } On Mon, Jul 6, 2015 at 10:21 AM, Jörn Franke jornfra...@gmail.com wrote: Can you provide the result set you are using and specify how you integrated the drools engine? Drools basically is based on a large shared memory. Hence, if you have several tasks in Shark they end up using different shared memory areas. A full integration of drools requires some sophisticated design and probably rewriting of the rules evaluation algorithm, so you probably have to rewrite that engine from scratch. Le dim. 5 juil. 2015 à 17:42, Ajit Bhingarkar a...@capiot.com a écrit : Hi, I am trying to integrate Drools rules API with Spark so that the solution could solve few CEP centric use cases. When I read data from a local file (simple FileWriter - readLine()), I see that all my rules are reliably fired and everytime I get the results as expected. I have tested with file record sizes from 5K to 5M; results are as expected, every time. However when I try to receive same data through a stream (I created a simple ServerSocket, and am reading the file and writing to the socket line by line) using a custom socket receiver; even though I see that data is received at the custom receiver's end; perhaps store() API has an issue, and data is not reliably persisted, (I am using StorageLevel.MEMORY_AND_DISK_SER_2() as recommended). Result is that my rules
Re: Spark got stuck with BlockManager after computing connected components using GraphX
If you don't want those logs flood your screen, you can disable it simply with: import org.apache.log4j.{Level, Logger} Logger.getLogger(org).setLevel(Level.OFF) Logger.getLogger(akka).setLevel(Level.OFF) Thanks Best Regards On Sun, Jul 5, 2015 at 7:27 PM, Hellen hong.lu.c...@gmail.com wrote: Sorry for the silly question. I'm fairly new to Spark. Because of the cleanup log messages, I didn't see scala, so I thought it's still working on something. If I press Enter, I got disconnected. I finally tried typing the variable name, which actually worked. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-got-stuck-with-BlockManager-after-computing-connected-components-using-GraphX-tp23620p23623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: cores and resource management
Try with *spark.cores.max*, executor cores is used when you usually run it on yarn mode. Thanks Best Regards On Mon, Jul 6, 2015 at 1:22 AM, nizang ni...@windward.eu wrote: hi, We're running spark 1.4.0 on ec2, with 6 machines, 4 cores each. We're trying to run an application on a number of total-executor-cores. but we want it to run on the minimal number of machines as possible (e.g. total-executor-cores=4, we'll want single machine. total-executor-cores=12, we'll want 3 machines) I'm running spark shell, in the following command: /root/spark/bin/spark-shell --total-executor-cores X --executor-cores 4 or /root/spark/bin/spark-shell --total-executor-cores X and checked the cores on the spark UI, and found the following: Req total-executor-coresActual cores with executor-cores param Actual cores without executor-cores=4 param 24 24 24 22 22 16 20 20 8 16 16 0 12 12 0 8 8 0 4 4 0 our questions: 1) Why we don't always get the number of cores we asked for when passing the executor-cores 4 parameter? It seems that the number of cores we actually get is something like max(24-(24-REQ_TOTAL_CORES)*4, 0) 2) How can we get our original request? get the cores in minimal number of machines? When playing with the executor-cores, we have the problem described in (1), but the cores are on minimal number of cores 3) Playing with the parameter spark.deploy.spreadOut didn't seem to help with our request thanks, nizan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/cores-and-resource-management-tp23628.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use caching in Spark Actions or Output operations?
Hi Sudarshan, As far as i understand your problem you should take a look at broadcast variables in spark. here you have the docs https://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables . Thanks Himanshu -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-caching-in-Spark-Actions-or-Output-operations-tp23549p23641.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java.io.IOException: No space left on device--regd.
While the job is running, just look in the directory and see whats the root cause of it (is it the logs? is it the shuffle? etc). Here's a few configuration options which you can try: - Disable shuffle : spark.shuffle.spill=false (It might end up in OOM) - Enable log rotation: sparkConf.set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) Thanks Best Regards On Mon, Jul 6, 2015 at 10:44 AM, Devarajan Srinivasan devathecool1...@gmail.com wrote: Hi , I am trying to run an ETL on spark which involves expensive shuffle operation. Basically I require a self-join to be performed on a sparkDataFrame RDD . The job runs fine for around 15 hours and when the stage(which performs the sef-join) is about to complete, I get a *java.io.IOException: No space left on device*. I initially thought this could be due to *spark.local.dir* pointing to */tmp* directory which was configured with *2GB* of space, since this job requires expensive shuffles,spark requires more space to write the shuffle files. Hence I configured *spark.local.dir* to point to a different directory which has *1TB* of space. But still I get the same *no space left exception*. What could be the root cause of this issue? Thanks in advance. *Exception stacktrace:* *java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:87) at org.apache.spark.storage.DiskBlockObjectWriter.org http://org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:229) at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:87) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at org.xerial.snappy.SnappyOutputStream.dump(SnappyOutputStream.java:297) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:244) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:99) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1285) at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1230) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1426) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:370) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)*
Re: java.io.IOException: No space left on device--regd.
You can also set these in the spark-env.sh file : export SPARK_WORKER_DIR=/mnt/spark/ export SPARK_LOCAL_DIR=/mnt/spark/ Thanks Best Regards On Mon, Jul 6, 2015 at 12:29 PM, Akhil Das ak...@sigmoidanalytics.com wrote: While the job is running, just look in the directory and see whats the root cause of it (is it the logs? is it the shuffle? etc). Here's a few configuration options which you can try: - Disable shuffle : spark.shuffle.spill=false (It might end up in OOM) - Enable log rotation: sparkConf.set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) Thanks Best Regards On Mon, Jul 6, 2015 at 10:44 AM, Devarajan Srinivasan devathecool1...@gmail.com wrote: Hi , I am trying to run an ETL on spark which involves expensive shuffle operation. Basically I require a self-join to be performed on a sparkDataFrame RDD . The job runs fine for around 15 hours and when the stage(which performs the sef-join) is about to complete, I get a *java.io.IOException: No space left on device*. I initially thought this could be due to *spark.local.dir* pointing to */tmp* directory which was configured with *2GB* of space, since this job requires expensive shuffles,spark requires more space to write the shuffle files. Hence I configured *spark.local.dir* to point to a different directory which has *1TB* of space. But still I get the same *no space left exception*. What could be the root cause of this issue? Thanks in advance. *Exception stacktrace:* *java.io.IOException: No space left on device at java.io.FileOutputStream.writeBytes(Native Method) at java.io.FileOutputStream.write(FileOutputStream.java:345) at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:87) at org.apache.spark.storage.DiskBlockObjectWriter.org http://org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:229) at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:87) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) at org.xerial.snappy.SnappyOutputStream.dump(SnappyOutputStream.java:297) at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:244) at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:99) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeNonProxyDesc(ObjectOutputStream.java:1285) at java.io.ObjectOutputStream.writeClassDesc(ObjectOutputStream.java:1230) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1426) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1576) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:370) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)*
How does Spark streaming move data around ?
I know that Spark is using data parallelism over, say, HDFS - optimally running computations on local data (aka data locality). I was wondering how Spark streaming moves data (messages) around? since the data is streamed in as DStreams and is not on a distributed FS like HDFS. Thanks!
Re: Unable to start spark-sql
Its complaining for a jdbc driver. Add it in your driver classpath like: ./bin/spark-sql --driver-class-path /home/akhld/sigmoid/spark/lib/mysql-connector-java-5.1.32-bin.jar Thanks Best Regards On Mon, Jul 6, 2015 at 11:42 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Sparkers, I am unable to start spark-sql service please check the error as mentioned below. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:497) at
Re: Unable to start spark-sql
oK Let me try On Mon, Jul 6, 2015 at 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Its complaining for a jdbc driver. Add it in your driver classpath like: ./bin/spark-sql --driver-class-path /home/akhld/sigmoid/spark/lib/mysql-connector-java-5.1.32-bin.jar Thanks Best Regards On Mon, Jul 6, 2015 at 11:42 AM, sandeep vura sandeepv...@gmail.com wrote: Hi Sparkers, I am unable to start spark-sql service please check the error as mentioned below. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:101) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 9 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:534) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 14 more Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:622) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67) at
Re: Benchmark results between Flink and Spark
I would guess the opposite is true for highly iterative benchmarks (common in graph processing and data-science). Spark has a pretty large overhead per iteration, more optimisations and planning only makes this worse. Sure people implemented things like dijkstra's algorithm in spark (a problem where the number of iterations is bounded by the circumference of the input graph), but all the datasets I've seen it running on had a very small circumference (which is common for e.g. social networks). Take sparkSQL for example. Catalyst is a really good query optimiser, but it introduces significant overhead. Since spark has no iterative semantics on its own (unlike flink), one has to materialise the intermediary dataframe at each iteration boundary to determine if a termination criterion is reached. This causes a huge amount of planning, especially since it looks like catalyst will try to optimise the dependency graph regardless of caching. A dependency graph that grows in the number of iterations and thus the size of the input dataset. In flink on the other hand, you can describe you entire iterative program through transformations without ever calling an action. This means that the optimiser will only have to do planing once. Just my 2 cents :) Cheers, Jan On 06 Jul 2015, at 06:10, n...@reactor8.com wrote: Maybe some flink benefits from some pts they outline here: http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html Probably if re-ran the benchmarks with 1.5/tungsten line would close the gap a bit(or a lot) with spark moving towards similar style off-heap memory mgmt, more planning optimizations From: Jerry Lam [mailto:chiling...@gmail.com] Sent: Sunday, July 5, 2015 6:28 PM To: Ted Yu Cc: Slim Baltagi; user Subject: Re: Benchmark results between Flink and Spark Hi guys, I just read the paper too. There is no much information regarding why Flink is faster than Spark for data science type of workloads in the benchmark. It is very difficult to generalize the conclusion of a benchmark from my point of view. How much experience the author has with Spark is in comparisons to Flink is one of the immediate questions I have. It would be great if they have the benchmark software available somewhere for other people to experiment. just my 2 cents, Jerry On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: There was no mentioning of the versions of Flink and Spark used in benchmarking. The size of cluster is quite small. Cheers On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com mailto:sbalt...@gmail.com wrote: Hi Apache Flink outperforms Apache Spark in processing machine learning graph algorithms and relational queries but not in batch processing! The results were published in the proceedings of the 18th International Conference, Business Information Systems 2015, Poznań, Poland, June 24-26, 2015. Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan Franczyk is available for preview at http://goo.gl/WocQci http://goo.gl/WocQci on pages 28-37. Enjoy! Slim Baltagi http://www.SparkBigData.com http://www.sparkbigdata.com/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Benchmark results between Flink and Spark
Sorry, that should be shortest path, and diameter of the graph. I shouldn't write emails before I get my morning coffee... On 06 Jul 2015, at 09:09, Jan-Paul Bultmann janpaulbultm...@me.com wrote: I would guess the opposite is true for highly iterative benchmarks (common in graph processing and data-science). Spark has a pretty large overhead per iteration, more optimisations and planning only makes this worse. Sure people implemented things like dijkstra's algorithm in spark (a problem where the number of iterations is bounded by the circumference of the input graph), but all the datasets I've seen it running on had a very small circumference (which is common for e.g. social networks). Take sparkSQL for example. Catalyst is a really good query optimiser, but it introduces significant overhead. Since spark has no iterative semantics on its own (unlike flink), one has to materialise the intermediary dataframe at each iteration boundary to determine if a termination criterion is reached. This causes a huge amount of planning, especially since it looks like catalyst will try to optimise the dependency graph regardless of caching. A dependency graph that grows in the number of iterations and thus the size of the input dataset. In flink on the other hand, you can describe you entire iterative program through transformations without ever calling an action. This means that the optimiser will only have to do planing once. Just my 2 cents :) Cheers, Jan On 06 Jul 2015, at 06:10, n...@reactor8.com mailto:n...@reactor8.com wrote: Maybe some flink benefits from some pts they outline here: http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html Probably if re-ran the benchmarks with 1.5/tungsten line would close the gap a bit(or a lot) with spark moving towards similar style off-heap memory mgmt, more planning optimizations From: Jerry Lam [mailto:chiling...@gmail.com mailto:chiling...@gmail.com] Sent: Sunday, July 5, 2015 6:28 PM To: Ted Yu Cc: Slim Baltagi; user Subject: Re: Benchmark results between Flink and Spark Hi guys, I just read the paper too. There is no much information regarding why Flink is faster than Spark for data science type of workloads in the benchmark. It is very difficult to generalize the conclusion of a benchmark from my point of view. How much experience the author has with Spark is in comparisons to Flink is one of the immediate questions I have. It would be great if they have the benchmark software available somewhere for other people to experiment. just my 2 cents, Jerry On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com mailto:yuzhih...@gmail.com wrote: There was no mentioning of the versions of Flink and Spark used in benchmarking. The size of cluster is quite small. Cheers On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com mailto:sbalt...@gmail.com wrote: Hi Apache Flink outperforms Apache Spark in processing machine learning graph algorithms and relational queries but not in batch processing! The results were published in the proceedings of the 18th International Conference, Business Information Systems 2015, Poznań, Poland, June 24-26, 2015. Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan Franczyk is available for preview at http://goo.gl/WocQci http://goo.gl/WocQci on pages 28-37. Enjoy! Slim Baltagi http://www.SparkBigData.com http://www.sparkbigdata.com/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: 1.4.0 regression: out-of-memory errors on small data
Hi Sim, I think the right way to set the PermGen Size is through driver extra JVM options, i.e. --conf spark.driver.extraJavaOptions=-XX:MaxPermSize=256m Can you try it? Without this conf, your driver's PermGen size is still 128m. Thanks, Yin On Mon, Jul 6, 2015 at 4:07 AM, Denny Lee denny.g@gmail.com wrote: I went ahead and tested your file and the results from the tests can be seen in the gist: https://gist.github.com/dennyglee/c933b5ae01c57bd01d94. Basically, when running {Java 7, MaxPermSize = 256} or {Java 8, default} the query ran without any issues. I was able to recreate the issue with {Java 7, default}. I included the commands I used to start the spark-shell but basically I just used all defaults (no alteration to driver or executor memory) with the only additional call was with driver-class-path to connect to MySQL Hive metastore. This is on OSX Macbook Pro. One thing I did notice is that your version of Java 7 is version 51 while my version of Java 7 version 79. Could you see if updating to Java 7 version 79 perhaps allows you to use the MaxPermSize call? On Mon, Jul 6, 2015 at 1:36 PM Simeon Simeonov s...@swoop.com wrote: The file is at https://www.dropbox.com/s/a00sd4x65448dl2/apache-spark-failure-data-part-0.gz?dl=1 The command was included in the gist SPARK_REPL_OPTS=-XX:MaxPermSize=256m spark-1.4.0-bin-hadoop2.6/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --driver-memory 4g --executor-memory 4g /Sim Simeon Simeonov, Founder CTO, Swoop http://swoop.com/ @simeons http://twitter.com/simeons | blog.simeonov.com | 617.299.6746 From: Yin Huai yh...@databricks.com Date: Monday, July 6, 2015 at 12:59 AM To: Simeon Simeonov s...@swoop.com Cc: Denny Lee denny.g@gmail.com, Andy Huang andy.hu...@servian.com.au, user user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data I have never seen issue like this. Setting PermGen size to 256m should solve the problem. Can you send me your test file and the command used to launch the spark shell or your application? Thanks, Yin On Sun, Jul 5, 2015 at 9:17 PM, Simeon Simeonov s...@swoop.com wrote: Yin, With 512Mb PermGen, the process still hung and had to be kill -9ed. At 1Gb the spark shell associated processes stopped hanging and started exiting with scala println(dfCount.first.getLong(0)) 15/07/06 00:10:07 INFO storage.MemoryStore: ensureFreeSpace(235040) called with curMem=0, maxMem=2223023063 15/07/06 00:10:07 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 229.5 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.MemoryStore: ensureFreeSpace(20184) called with curMem=235040, maxMem=2223023063 15/07/06 00:10:08 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.7 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:65464 (size: 19.7 KB, free: 2.1 GB) 15/07/06 00:10:08 INFO spark.SparkContext: Created broadcast 2 from first at console:30 java.lang.OutOfMemoryError: PermGen space Stopping spark context. Exception in thread main Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread main 15/07/06 00:10:14 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:65464 in memory (size: 19.7 KB, free: 2.1 GB) That did not change up until 4Gb of PermGen space and 8Gb for driver executor each. I stopped at this point because the exercise started looking silly. It is clear that 1.4.0 is using memory in a substantially different manner. I'd be happy to share the test file so you can reproduce this in your own environment. /Sim Simeon Simeonov, Founder CTO, Swoop http://swoop.com/ @simeons http://twitter.com/simeons | blog.simeonov.com | 617.299.6746 From: Yin Huai yh...@databricks.com Date: Sunday, July 5, 2015 at 11:04 PM To: Denny Lee denny.g@gmail.com Cc: Andy Huang andy.hu...@servian.com.au, Simeon Simeonov s...@swoop.com, user user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data Sim, Can you increase the PermGen size? Please let me know what is your setting when the problem disappears. Thanks, Yin On Sun, Jul 5, 2015 at 5:59 PM, Denny Lee denny.g@gmail.com wrote: I had run into the same problem where everything was working swimmingly with Spark 1.3.1. When I switched to Spark 1.4, either by upgrading to Java8 (from Java7) or by knocking up the PermGenSize had solved my issue. HTH! On Mon, Jul 6, 2015 at 8:31 AM Andy Huang andy.hu...@servian.com.au wrote: We have hit the same issue in spark shell when registering a temp table. We observed it happening with those who had JDK 6. The problem went away after installing jdk 8. This was only for the tutorial materials which was about loading a parquet
Re: Streaming: updating broadcast variables
Hi James, The code below shows one way how you can update the broadcast variable on the executors: // ... events stream setup var startTime = new Date().getTime() var hashMap = HashMap(1 - (1, 1), 2 - (2, 2)) var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap) val TWO_MINUTES = 12 //eventStream is a DStream eventStream.foreachRDD(rdd = { // Executed on the driver not the executors if (new Date().getTime() - startTime TWO_MINUTES) { // remove old broadcast variable hashMapBroadcast.unpersist() // create new one hashMapBroadcast = stream.context.sparkContext.broadcast(1 - (1, 1000), 2 - (2, 2000)) } }) val broadcastValuesFromStream = activitiesByVisitKey.map(activity = hashMapBroadcast.value(1)) // should print (1, 1000) after 2 minutes when updated broadcastValuesFromStream.print() Regards, Conor On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You cannot update the broadcasted variable.. It wont get reflected on workers. On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote: Hi all, I'm filtering a DStream using a function. I need to be able to change this function while the application is running (I'm polling a service to see if a user has changed their filtering). The filter function is a transformation and runs on the workers, so that's where the updates need to go. I'm not sure of the best way to do this. Initially broadcasting seemed like the way to go: the filter is actually quite large. But I don't think I can update something I've broadcasted. I've tried unpersisting and re-creating the broadcast variable but it became obvious this wasn't updating the reference on the worker. So am I correct in thinking I can't use broadcasted variables for this purpose? The next option seems to be: stopping the JavaStreamingContext, creating a new one from the SparkContext, updating the filter function, and re-creating the DStreams (I'm using direct streams from Kafka). If I re-created the JavaStreamingContext would the accumulators (which are created from the SparkContext) keep working? (Obviously I'm going to try this soon) In summary: 1) Can broadcasted variables be updated? 2) Is there a better way than re-creating the JavaStreamingContext and DStreams? Thanks, James
Re: 1.4.0 regression: out-of-memory errors on small data
Yin, that did the trick. I'm curious what was the effect of the environment variable, however, as the behavior of the shell changed from hanging to quitting when the env var value got to 1g. /Sim Simeon Simeonov, Founder CTO, Swoophttp://swoop.com/ @simeonshttp://twitter.com/simeons | blog.simeonov.comhttp://blog.simeonov.com/ | 617.299.6746 From: Yin Huai yh...@databricks.commailto:yh...@databricks.com Date: Monday, July 6, 2015 at 11:41 AM To: Denny Lee denny.g@gmail.commailto:denny.g@gmail.com Cc: Simeon Simeonov s...@swoop.commailto:s...@swoop.com, Andy Huang andy.hu...@servian.com.aumailto:andy.hu...@servian.com.au, user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data Hi Sim, I think the right way to set the PermGen Size is through driver extra JVM options, i.e. --conf spark.driver.extraJavaOptions=-XX:MaxPermSize=256m Can you try it? Without this conf, your driver's PermGen size is still 128m. Thanks, Yin On Mon, Jul 6, 2015 at 4:07 AM, Denny Lee denny.g@gmail.commailto:denny.g@gmail.com wrote: I went ahead and tested your file and the results from the tests can be seen in the gist: https://gist.github.com/dennyglee/c933b5ae01c57bd01d94. Basically, when running {Java 7, MaxPermSize = 256} or {Java 8, default} the query ran without any issues. I was able to recreate the issue with {Java 7, default}. I included the commands I used to start the spark-shell but basically I just used all defaults (no alteration to driver or executor memory) with the only additional call was with driver-class-path to connect to MySQL Hive metastore. This is on OSX Macbook Pro. One thing I did notice is that your version of Java 7 is version 51 while my version of Java 7 version 79. Could you see if updating to Java 7 version 79 perhaps allows you to use the MaxPermSize call? On Mon, Jul 6, 2015 at 1:36 PM Simeon Simeonov s...@swoop.commailto:s...@swoop.com wrote: The file is at https://www.dropbox.com/s/a00sd4x65448dl2/apache-spark-failure-data-part-0.gz?dl=1 The command was included in the gist SPARK_REPL_OPTS=-XX:MaxPermSize=256m spark-1.4.0-bin-hadoop2.6/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3 --driver-memory 4g --executor-memory 4g /Sim Simeon Simeonov, Founder CTO, Swoophttp://swoop.com/ @simeonshttp://twitter.com/simeons | blog.simeonov.comhttp://blog.simeonov.com/ | 617.299.6746tel:617.299.6746 From: Yin Huai yh...@databricks.commailto:yh...@databricks.com Date: Monday, July 6, 2015 at 12:59 AM To: Simeon Simeonov s...@swoop.commailto:s...@swoop.com Cc: Denny Lee denny.g@gmail.commailto:denny.g@gmail.com, Andy Huang andy.hu...@servian.com.aumailto:andy.hu...@servian.com.au, user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: 1.4.0 regression: out-of-memory errors on small data I have never seen issue like this. Setting PermGen size to 256m should solve the problem. Can you send me your test file and the command used to launch the spark shell or your application? Thanks, Yin On Sun, Jul 5, 2015 at 9:17 PM, Simeon Simeonov s...@swoop.commailto:s...@swoop.com wrote: Yin, With 512Mb PermGen, the process still hung and had to be kill -9ed. At 1Gb the spark shell associated processes stopped hanging and started exiting with scala println(dfCount.first.getLong(0)) 15/07/06 00:10:07 INFO storage.MemoryStore: ensureFreeSpace(235040) called with curMem=0, maxMem=2223023063 15/07/06 00:10:07 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 229.5 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.MemoryStore: ensureFreeSpace(20184) called with curMem=235040, maxMem=2223023063 15/07/06 00:10:08 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 19.7 KB, free 2.1 GB) 15/07/06 00:10:08 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:65464 (size: 19.7 KB, free: 2.1 GB) 15/07/06 00:10:08 INFO spark.SparkContext: Created broadcast 2 from first at console:30 java.lang.OutOfMemoryError: PermGen space Stopping spark context. Exception in thread main Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread main 15/07/06 00:10:14 INFO storage.BlockManagerInfo: Removed broadcast_2_piece0 on localhost:65464 in memory (size: 19.7 KB, free: 2.1 GB) That did not change up until 4Gb of PermGen space and 8Gb for driver executor each. I stopped at this point because the exercise started looking silly. It is clear that 1.4.0 is using memory in a substantially different manner. I'd be happy to share the test file so you can reproduce this in your own environment. /Sim Simeon Simeonov, Founder CTO, Swoophttp://swoop.com/ @simeonshttp://twitter.com/simeons | blog.simeonov.comhttp://blog.simeonov.com/ | 617.299.6746tel:617.299.6746 From: Yin Huai yh...@databricks.commailto:yh...@databricks.com