Has anybody faced SPARK-2604 issue regarding Application hang state
Hi, Has anyone else also experienced https://issues.apache.org/jira/browse/SPARK-2604? It is an edge case scenario of mis configuration, where the executor memory asked is same as the maximum allowed memory by yarn. In such situation, application stays in hang state, and the reason is not logged in verbose manner to be debugged easily. As per the fix, it gets detected and corresponding reasons are logged before failing the application. I will prefer the fix to be in open source code version, please share your thoughts. Thanks,
Using one sql query's result inside another sql query
Hi, I am using Hive Context to fire the sql queries inside spark. I have created a schemaRDD( Let's call it cachedSchema ) inside my code. If i fire a sql query ( Query 1 ) on top of it, then it works. But if I refer to Query1's result inside another sql, that fails. Note that I have already registered Query1's result as temp table. registerTempTable(cachedSchema) Queryresult1 = Query1 using cachedSchema [ works ] registerTempTable(Queryresult1) Queryresult2 = Query2 using Queryresult1 [ FAILS ] Is it expected?? Any known work around? Following is the exception I am receiving : *org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'f1,'f2,'f3,'f4, tree:* *Project ['f1,'f2,'f3,'f4]* * Filter ('count 3)* * LowerCaseSchema * * Subquery x* *Project ['F1,'F2,'F3,'F4,'F6,'Count]* * LowerCaseSchema * * Subquery src* * SparkLogicalPlan (ExistingRdd [F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at SQLBlock.scala:64)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:72)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$1.applyOrElse(Analyzer.scala:70)* * at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)* * at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)* * at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* * at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)* * at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)* * at scala.collection.immutable.List.foreach(List.scala:318)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)* * at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)* * at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)* * at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)* * at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)* * at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)* * at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)* * at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)* * at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191)* * at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:189)*
Re: Using one sql query's result inside another sql query
Thanks Cheng. For the time being , As a work around, I had applied the schema to Queryresult1, and then registered the result as temp table. Although that works, but I was not sure of performance impact, as that might block some optimisation in some scenarios. This flow (on spark 1.1 ) works: registerTempTable(cachedSchema) Queryresult1 = Query1 using cachedSchema [ works ] *queryResult1withSchema = hiveContext.applySchema( Queryresult1, Queryresult1.schema )* registerTempTable(*queryResult1withSchema*) Queryresult2 = Query2 using *queryResult1withSchema* [ *works* ] On Fri, Sep 26, 2014 at 5:13 PM, Cheng Lian lian.cs@gmail.com wrote: H Twinkle, The failure is caused by case sensitivity. The temp table actually stores the original un-analyzed logical plan, thus field names remain capital (F1, F2, etc.). I believe this issue has already been fixed by PR #2382 https://github.com/apache/spark/pull/2382. As a workaround, you can use lowercase letters in field names instead. Cheng On 9/25/14 1:18 PM, twinkle sachdeva wrote: Hi, I am using Hive Context to fire the sql queries inside spark. I have created a schemaRDD( Let's call it cachedSchema ) inside my code. If i fire a sql query ( Query 1 ) on top of it, then it works. But if I refer to Query1's result inside another sql, that fails. Note that I have already registered Query1's result as temp table. registerTempTable(cachedSchema) Queryresult1 = Query1 using cachedSchema [ works ] registerTempTable(Queryresult1) Queryresult2 = Query2 using Queryresult1 [ FAILS ] Is it expected?? Any known work around? Following is the exception I am receiving : *org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'f1,'f2,'f3,'f4, tree:* *Project ['f1,'f2,'f3,'f4]* * Filter ('count 3)* * LowerCaseSchema * * Subquery x* *Project ['F1,'F2,'F3,'F4,'F6,'Count]* * LowerCaseSchema * * Subquery src* * SparkLogicalPlan (ExistingRdd [F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at SQLBlock.scala:64)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:72)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:70)* * at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)* * at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)* * at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59)* * at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* * at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)* * at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51)* * at scala.collection.immutable.List.foreach(List.scala:318)* * at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)* * at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)* * at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)* * at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)* * at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)* * at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)* * at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)* * at org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)* * at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)* * at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:191)* * at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:189)*
Regarding java version requirement in spark 1.2.0 or upcoming releases
Hi, Can somebody please share the plans regarding java version's support for apache spark 1.2.0 or near future releases. Will java 8 become the all feature supported version in apache spark 1.2 or java 1.7 will suffice? Thanks,
Regarding using spark sql with yarn
Hi, I have been using spark sql with yarn. It works fine with yarn-client mode, but with yarn-cluster mode, we are facing 2 issues. Is yarn-cluster mode not recommended for spark-sql using hiveContext ?? *Problem #1* We are not able to use any query with very simple filtering operation like, where as just select x,y,x works. select x,y,z from table1 == works on yarn-client as well as yarn-cluster mode select x,y,z from table1 where z 10 == works on yarn-client but NOT on yarn-cluster mode. Exception says Unsupported Feature tracing to HiveQL.scala It is quite strange in the sense that code is expected to be same in parsing the sql. *Problem #2* On another machine, we have configured Hive, there we are getting this issue : java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:344) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:278) Caused by: javax.jdo.JDOFatalInternalException: Error creating transactional connection factory NestedThrowables: java.lang.reflect.InvocationTargetException at org.datanucleus.api.jdo.NucleusJDOHelper.getJDOException ForNucleusException(NucleusJDOHelper.java:587) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:788) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333) at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:309) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:338) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:247) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:222) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:67) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:498) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:476) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:524) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:398) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:357) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.init(RetryingHMSHandler.java:54) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:59) at org.apache.hadoop.hive.metastore.HiveMetaStore.newHMSHandler(HiveMetaStore.java:4948) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.init(HiveMetaStoreClient.java:171) ... 31 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.lang.reflect.Constructor.newInstance(Unknown Source) at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631) at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:325) at org.datanucleus.store.AbstractStoreManager.registerConnectionFactory(AbstractStoreManager.java:282) at org.datanucleus.store.AbstractStoreManager.init(AbstractStoreManager.java:240) at org.datanucleus.store.rdbms.RDBMSStoreManager.init(RDBMSStoreManager.java:286) at
Re: Spark can't find jars
Hi, Try running following in the spark folder: bin/*run-example *SparkPi 10 If this runs fine, just see the set of arguments being passed via this script, and try in similar way. Thanks, On Thu, Oct 16, 2014 at 2:59 PM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hi, I have created a JIRA (SPARK-3967 https://issues.apache.org/jira/browse/SPARK-3967), can you please confirm that you are hit by the same issue? Thanks, Christophe. On 15/10/2014 09:49, Christophe Préaud wrote: Hi Jimmy, Did you try my patch? The problem on my side was that the hadoop.tmp.dir (in hadoop core-site.xml) was not handled properly by Spark when it is set on multiple partitions/disks, i.e.: property namehadoop.tmp.dir/name value file:/d1/yarn/local,file:/d2/yarn/local,file:/d3/yarn/local,file:/d4/yarn/local,file:/d5/yarn/local,file:/d6/yarn/local,file:/d7/yarn/local /value /property Hence, you won't be hit by this bug if your hadoop.tmp.dir is set on one partition only. If your hadoop.tmp.dir is also set on several partitions, I agree that it looks like a bug in Spark. Christophe. On 14/10/2014 18:50, Jimmy McErlain wrote: So the only way that I could make this work was to build a fat jar file as suggested earlier. To me (and I am no expert) it seems like this is a bug. Everything was working for me prior to our upgrade to Spark 1.1 on Hadoop 2.2 but now it seems to not... ie packaging my jars locally then pushing them out to the cluster and pointing them to corresponding dependent jars Sorry I cannot be more help! J ᐧ *JIMMY MCERLAIN* DATA SCIENTIST (NERD) *. . . . . . . . . . . . . . . . . .* *IF WE CAN’T DOUBLE YOUR SALES,* *ONE OF US IS IN THE WRONG BUSINESS. * *E*: ji...@sellpoints.com *M*: *510.303.7751* On Tue, Oct 14, 2014 at 4:59 AM, Christophe Préaud christophe.pre...@kelkoo.com wrote: Hello, I have already posted a message with the exact same problem, and proposed a patch (the subject is Application failure in yarn-cluster mode). Can you test it, and see if it works for you? I would be glad too if someone can confirm that it is a bug in Spark 1.1.0. Regards, Christophe. On 14/10/2014 03:15, Jimmy McErlain wrote: BTW this has always worked for me before until we upgraded the cluster to Spark 1.1.1... J ᐧ *JIMMY MCERLAIN* DATA SCIENTIST (NERD) *. . . . . . . . . . . . . . . . . .* *IF WE CAN’T DOUBLE YOUR SALES,* *ONE OF US IS IN THE WRONG BUSINESS. * *E*: ji...@sellpoints.com *M*: *510.303.7751 510.303.7751* On Mon, Oct 13, 2014 at 5:39 PM, HARIPRIYA AYYALASOMAYAJULA aharipriy...@gmail.com wrote: Helo, Can you check if the jar file is available in the target-scala-2.10 folder? When you use sbt package to make the jar file, that is where the jar file would be located. The following command works well for me: spark-submit --class “Classname --master yarn-cluster jarfile(withcomplete path) Can you try checking with this initially and later add other options? On Mon, Oct 13, 2014 at 7:36 PM, Jimmy ji...@sellpoints.com wrote: Having the exact same error with the exact same jar Do you work for Altiscale? :) J Sent from my iPhone On Oct 13, 2014, at 5:33 PM, Andy Srine andy.sr...@gmail.com wrote: Hi Guys, Spark rookie here. I am getting a file not found exception on the --jars. This is on the yarn cluster mode and I am running the following command on our recently upgraded Spark 1.1.1 environment. ./bin/spark-submit --verbose --master yarn --deploy-mode cluster --class myEngine --driver-memory 1g --driver-library-path /hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-0.4.18-201406111750.jar --executor-memory 5g --executor-cores 5 --jars /home/andy/spark/lib/joda-convert-1.2.jar --queue default --num-executors 4 /home/andy/spark/lib/my-spark-lib_1.0.jar This is the error I am hitting. Any tips would be much appreciated. The file permissions looks fine on my local disk. 14/10/13 22:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED 14/10/13 22:49:39 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. Exception in thread Driver java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 1.0 failed 4 times, most recent failure: Lost task 3.3 in stage 1.0 (TID 12, 122-67.vb2.company.com): java.io.FileNotFoundException: ./joda-convert-1.2.jar (Permission denied)
Regarding minimum number of partitions while reading data from Hadoop
Hi, In our job, we need to process the data in small chunks, so as to avoid GC and other stuff. For this, we are using old API of hadoop as that let us specify parameter like minPartitions. Does any one knows, If there a way to do the same via newHadoopAPI also? How that way will be different from older API? I am little bit aware of split size stuff, but not much aware regarding any promise that minimum number of partitions criteria gets satisfied or not. Any pointers will be of help. Thanks, Twinkle
Regarding shuffle data file format
Hi, What is the file format which is used to write files while shuffle write? Is it dependent on the spark shuffle manager or output format? Is it possible to change the file format for shuffle, irrespective of the output format of the file? Thanks, Twinkle
Re: Priority queue in spark
Hi, Maybe this is what you are looking for : http://spark.apache.org/docs/1.2.0/job-scheduling.html#fair-scheduler-pools Thanks, On Mon, Mar 16, 2015 at 8:15 PM, abhi abhishek...@gmail.com wrote: Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Re: Priority queue in spark
In that case, having pre configured pools, but using the correct pool at code level might do. On Tue, Mar 17, 2015 at 11:23 AM, abhi abhishek...@gmail.com wrote: yes . Each generated job can have a different priority it is like a recursive function, where in each iteration generate job will be submitted to the spark cluster based on the priority. jobs will lower priority or less than some threshold will be discarded. Thanks, Abhi On Mon, Mar 16, 2015 at 10:36 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi Abhi, You mean each task of a job can have different priority or job generated via one job can have different priority? On Tue, Mar 17, 2015 at 11:04 AM, Mark Hamstra m...@clearstorydata.com wrote: http://apache-spark-developers-list.1001551.n3.nabble.com/Job-priority-td10076.html#a10079 On Mon, Mar 16, 2015 at 10:26 PM, abhi abhishek...@gmail.com wrote: If i understand correctly , the above document creates pool for priority which is static in nature and has to be defined before submitting the job . .in my scenario each generated task can have different priority. Thanks, Abhi On Mon, Mar 16, 2015 at 9:48 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Maybe this is what you are looking for : http://spark.apache.org/docs/1.2.0/job-scheduling.html#fair-scheduler-pools Thanks, On Mon, Mar 16, 2015 at 8:15 PM, abhi abhishek...@gmail.com wrote: Hi Current all the jobs in spark gets submitted using queue . i have a requirement where submitted job will generate another set of jobs with some priority , which should again be submitted to spark cluster based on priority ? Means job with higher priority should be executed first,Is it feasible ? Any help is appreciated ? Thanks, Abhi
Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), increases this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
Re: One of the executor not getting StopExecutor message
Hi, Operations are not very extensive, as this scenario is not always reproducible. One of the executor start behaving in this manner. For this particular application, we are using 8 cores in one executors, and practically, 4 executors are launched on one machine. This machine has good config with respect to number of cores. Somehow, to me it seems to be some akka communication issue. If i try to take thread dump of the executor, once it appears to be in trouble, then time out happens. Can it be something related to* spark.akka.threads?* On Fri, Feb 27, 2015 at 3:55 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Mostly, that particular executor is stuck on GC Pause, what operation are you performing? You can try increasing the parallelism if you see only 1 executor is doing the task. Thanks Best Regards On Fri, Feb 27, 2015 at 11:39 AM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, I am running a spark application on Yarn in cluster mode. One of my executor appears to be in hang state, for a long time, and gets finally killed by the driver. As compared to other executors, It have not received StopExecutor message from the driver. Here are the logs at the end of this container (C_1): 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Done removing broadcast 36, response is 2 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$aB] 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: closed 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: stopped, remaining connections 0 15/02/26 18:17:32 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 executed 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 expired 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 exited 15/02/26 20:33:13 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM NOTE that it has no logs for more than 2hrs. Here are the logs at the end of normal container ( C_2): 15/02/26 20:33:09 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$D+b] 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] received message StopExecutor from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/02/26 20:33:10 INFO storage.MemoryStore: MemoryStore cleared 15/02/26 20:33:10 INFO storage.BlockManager: BlockManager stopped 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] *handled message (181.499835 ms) StopExecutor* from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: removing client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: Stopping client 15/02/26 20:33:10 DEBUG storage.DiskBlockManager: Shutdown hook called 15/02/26 20:33:10 DEBUG util.Utils: Shutdown hook called At the driver side, i can see the logs related to heartbeat messages from C_1 till 20:05:00 -- 15/02/26 20:05:00 DEBUG spark.HeartbeatReceiver: [actor] received message Heartbeat(7,[Lscala.Tuple2;@151e5ce6,BlockManagerId(7, TMO-DN73, 34106)) from Actor[akka.tcp://sparkExecutor@TMO-DN73:43671/temp/$fn] After this, it continues to receive the heartbeat from other executors except this one, and here follows the message responsible for it's SIGTERM: 15/02/26 20:06:20 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, TMO-DN73, 34106) with no recent heart beats: 80515ms exceeds 45000ms I am using spark
delay between removing the block manager of an executor, and marking that as lost
Hi, Is there any relation between removing block manager of an executor and marking that as lost? In my setup,even after removing block manager ( after failing to do some operation )...it is taking more than 20 mins, to mark that as lost executor. Following are the logs: *15/03/03 10:26:49 WARN storage.BlockManagerMaster: Failed to remove broadcast 20 with removeFromMaster = true - Ask timed out on [Actor[akka.tcp://sparkExecutor@TMO-DN73:54363/user/BlockManagerActor1#-966525686]] after [3 ms]}* *15/03/03 10:27:41 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(1, TMO-DN73, 4) with no recent heart beats: 76924ms exceeds 45000ms* *15/03/03 10:27:41 INFO storage.BlockManagerMasterActor: Removing block manager BlockManagerId(1, TMO-DN73, 4)* *15/03/03 10:49:10 ERROR cluster.YarnClusterScheduler: Lost executor 1 on TMO-DN73: remote Akka client disassociated* How can i make this to happen faster? Thanks, Twinkle
One of the executor not getting StopExecutor message
Hi, I am running a spark application on Yarn in cluster mode. One of my executor appears to be in hang state, for a long time, and gets finally killed by the driver. As compared to other executors, It have not received StopExecutor message from the driver. Here are the logs at the end of this container (C_1): 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Done removing broadcast 36, response is 2 15/02/26 18:17:07 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$aB] 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: closed 15/02/26 18:17:09 DEBUG ipc.Client: IPC Client (1206963429) connection to TMO-GCR70/192.168.162.70:9000 from admin: stopped, remaining connections 0 15/02/26 18:17:32 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 executed 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 expired 15/02/26 18:18:00 DEBUG hdfs.LeaseRenewer: Lease renewer daemon for [] with renew id 1 exited 15/02/26 20:33:13 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM NOTE that it has no logs for more than 2hrs. Here are the logs at the end of normal container ( C_2): 15/02/26 20:33:09 DEBUG storage.BlockManagerSlaveActor: Sent response: 2 to Actor[akka.tcp://sparkDriver@TMO-DN73:37906/temp/$D+b] 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] received message StopExecutor from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown 15/02/26 20:33:10 INFO storage.MemoryStore: MemoryStore cleared 15/02/26 20:33:10 INFO storage.BlockManager: BlockManager stopped 15/02/26 20:33:10 DEBUG executor.CoarseGrainedExecutorBackend: [actor] *handled message (181.499835 ms) StopExecutor* from Actor[akka.tcp://sparkDriver@TMO-DN73 :37906/user/CoarseGrainedScheduler#160899257] 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/02/26 20:33:10 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: removing client from cache: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: stopping actual client because no more references remain: org.apache.hadoop.ipc.Client@76a68bd4 15/02/26 20:33:10 DEBUG ipc.Client: Stopping client 15/02/26 20:33:10 DEBUG storage.DiskBlockManager: Shutdown hook called 15/02/26 20:33:10 DEBUG util.Utils: Shutdown hook called At the driver side, i can see the logs related to heartbeat messages from C_1 till 20:05:00 -- 15/02/26 20:05:00 DEBUG spark.HeartbeatReceiver: [actor] received message Heartbeat(7,[Lscala.Tuple2;@151e5ce6,BlockManagerId(7, TMO-DN73, 34106)) from Actor[akka.tcp://sparkExecutor@TMO-DN73:43671/temp/$fn] After this, it continues to receive the heartbeat from other executors except this one, and here follows the message responsible for it's SIGTERM: 15/02/26 20:06:20 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(7, TMO-DN73, 34106) with no recent heart beats: 80515ms exceeds 45000ms I am using spark 1.2.1. Any pointer(s) ? Thanks, Twinkle
Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
Hi, Thanks Sandy. Another way to look at this is that would we like to have our long running application to die? So let's say, we create a window of around 10 batches, and we are using incremental kind of operations inside our application, as restart here is a relatively more costlier, so should it be the maximum number of executor failure's kind of criteria to fail the application or should we have some parameters around minimum number of executor's availability for some x time? So, if the application is not able to have minimum n number of executors within x period of time, then we should fail the application. Adding time factor here, will allow some window for spark to get more executors allocated if some of them fails. Thoughts please. Thanks, Twinkle On Wed, Apr 1, 2015 at 10:19 PM, Sandy Ryza sandy.r...@cloudera.com wrote: That's a good question, Twinkle. One solution could be to allow a maximum number of failures within any given time span. E.g. a max failures per hour property. -Sandy On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), exceeds this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
Re: RDD generated on every query
Hi, If you have the same spark context, then you can cache the query result via caching the table ( sqlContext.cacheTable(tableName) ). Maybe you can have a look at OOyola server also. On Tue, Apr 14, 2015 at 11:36 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You can use a tachyon based storage for that and everytime the client queries, you just get it from there. Thanks Best Regards On Mon, Apr 6, 2015 at 6:01 PM, Siddharth Ubale siddharth.ub...@syncoms.com wrote: Hi , In Spark Web Application the RDD is generating every time client is sending a query request. Is there any way where the RDD is compiled once and run query again and again on active SparkContext? Thanks, Siddharth Ubale, *Synchronized Communications * *#43, Velankani Tech Park, Block No. II, * *3rd Floor, Electronic City Phase I,* *Bangalore – 560 100* *Tel : +91 80 3202 4060* *Web:* *www.syncoms.com* http://www.syncoms.com/ *[image: LogoNEWmohLARGE]* *London*|*Bangalore*|*Orlando* *we innovate, plan, execute, and transform the business*
Re: set spark.storage.memoryFraction to 0 when no cached RDD and memory area for broadcast value?
Hi, In one of the application we have made which had no clone stuff, we have set the value of spark.storage.memoryFraction to very low, and yes that gave us performance benefits. Regarding that issue, you should also look at the data you are trying to broadcast, as sometimes creating that data structure at executor's itself as singleton helps. Thanks, On Tue, Apr 14, 2015 at 12:23 PM, Akhil Das ak...@sigmoidanalytics.com wrote: You could try leaving all the configuration values to default and running your application and see if you are still hitting the heap issue, If so try adding a Swap space to the machines which will definitely help. Another way would be to set the heap space manually (export _JAVA_OPTIONS=-Xmx5g) Thanks Best Regards On Wed, Apr 8, 2015 at 12:45 AM, Shuai Zheng szheng.c...@gmail.com wrote: Hi All, I am a bit confused on spark.storage.memoryFraction, this is used to set the area for RDD usage, will this RDD means only for cached and persisted RDD? So if my program has no cached RDD at all (means that I have no .cache() or .persist() call on any RDD), then I can set this spark.storage.memoryFraction to a very small number or even zero? I am writing a program which consume a lot of memory (broadcast value, runtime, etc). But I have no cached RDD, so should I just turn off this spark.storage.memoryFraction to 0 (which will help me to improve the performance)? And I have another issue on the broadcast, when I try to get a broadcast value, it throws me out of memory error, which part of memory should I allocate more (if I can’t increase my overall memory size). java.lang.OutOfMemoryError: Java heap spac e at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:218) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleA rraySerializer.read(DefaultArraySerializers.java:200) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:611) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.rea d(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSeria lizer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at org.apache.spark.serializer.KryoDeserializationStream.readObject(Kryo Serializer.scala:138) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Ser ializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:2 48) at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:13 6) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:5 49) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:431 ) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlo ck$1.apply(TorrentBroadcast.scala:167) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(Torren tBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(Torrent Broadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.s cala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast .scala:87) Regards, Shuai
Re: Addition of new Metrics for killed executors.
Hi Archit, What is your use case and what kind of metrics are you planning to add? Thanks, Twinkle On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, We are planning to add new Metrics in Spark for the executors that got killed during the execution. Was just curious, why this info is not already present. Is there some reason for not adding it.? Any ideas around are welcome. Thanks and Regards, Archit Thakur.
Re: Addition of new Metrics for killed executors.
Hi, Looks interesting. It is quite interesting to know about what could have been the reason for not showing these stats in UI. As per the description of Patrick W in https://spark-project.atlassian.net/browse/SPARK-999, it does not mention any exception w.r.t failed tasks/executors. Can somebody please comment if it is a bug or some intended behaviour w.r.t performance or some other bottleneck. --Twinkle On Mon, Apr 20, 2015 at 2:47 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi Twinkle, We have a use case in where we want to debug the reason of how n why an executor got killed. Could be because of stackoverflow, GC or any other unexpected scenario. If I see the driver UI there is no information present around killed executors, So was just curious how do people usually debug those things apart from scanning logs and understanding it. The metrics we are planning to add are similar to what we have for non killed executors - [data per stage specifically] - numFailedTasks, executorRunTime, inputBytes, memoryBytesSpilled .. etc. Apart from that we also intend to add all information present in an executor tabs for running executors. Thanks, Archit Thakur. On Mon, Apr 20, 2015 at 1:31 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi Archit, What is your use case and what kind of metrics are you planning to add? Thanks, Twinkle On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur archit279tha...@gmail.com wrote: Hi, We are planning to add new Metrics in Spark for the executors that got killed during the execution. Was just curious, why this info is not already present. Is there some reason for not adding it.? Any ideas around are welcome. Thanks and Regards, Archit Thakur.
Re: FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle
Hi, Can you please share your compression etc settings, which you are using. Thanks, Twinkle On Wed, May 6, 2015 at 4:15 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: I'm facing this error in Spark 1.3.1 https://issues.apache.org/jira/browse/SPARK-4105 Anyone knows what's the workaround? Change the compression codec for shuffle output? -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Regarding benefits of using more than one cpu for a task in spark
Hi, In spark, there are two settings regarding number of cores, one is at task level :spark.task.cpus and there is another one, which drives number of cores per executors: spark.executor.cores Apart from using more than one core for a task which has to call some other external API etc, is there any other use case / benefit of assigning more than one core to a task? As per the code, I can only see this being used while scheduling etc , as such RDD partitions etc remains untouched from this setting. Does this mean that coder needs to take care of coding the application logic to take care of this setting? ( which again let me think over this setting ). Comments please. Thanks, Twinkle
Re: Strategy regarding maximum number of executor's failure for log running jobs/ spark streaming jobs
Hi, One of the rational behind killing the app can be to avoid skewness in data. I have created this issue (https://issues.apache.org/jira/browse/SPARK-6735) to provide options for disabling this behaviour, as well as making the number of executor's failure to be relative with respect to a window duration. I will upload the PR shortly. Thanks, Twinkle On Tue, Apr 7, 2015 at 2:02 AM, Sandy Ryza sandy.r...@cloudera.com wrote: What's the advantage of killing an application for lack of resources? I think the rationale behind killing an app based on executor failures is that, if we see a lot of them in a short span of time, it means there's probably something going wrong in the app or on the cluster. On Wed, Apr 1, 2015 at 7:08 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, Thanks Sandy. Another way to look at this is that would we like to have our long running application to die? So let's say, we create a window of around 10 batches, and we are using incremental kind of operations inside our application, as restart here is a relatively more costlier, so should it be the maximum number of executor failure's kind of criteria to fail the application or should we have some parameters around minimum number of executor's availability for some x time? So, if the application is not able to have minimum n number of executors within x period of time, then we should fail the application. Adding time factor here, will allow some window for spark to get more executors allocated if some of them fails. Thoughts please. Thanks, Twinkle On Wed, Apr 1, 2015 at 10:19 PM, Sandy Ryza sandy.r...@cloudera.com wrote: That's a good question, Twinkle. One solution could be to allow a maximum number of failures within any given time span. E.g. a max failures per hour property. -Sandy On Tue, Mar 31, 2015 at 11:52 PM, twinkle sachdeva twinkle.sachd...@gmail.com wrote: Hi, In spark over YARN, there is a property spark.yarn.max.executor.failures which controls the maximum number of executor's failure an application will survive. If number of executor's failures ( due to any reason like OOM or machine failure etc ), exceeds this value then applications quits. For small duration spark job, this looks fine, but for the long running jobs as this does not take into account the duration, this can lead to same treatment for two different scenarios ( mentioned below) : 1. executors failing with in 5 mins. 2. executors failing sparsely, but at some point even a single executor failure ( which application could have survived ) can make the application quit. Sending it to the community to listen what kind of behaviour / strategy they think will be suitable for long running spark jobs or spark streaming jobs. Thanks and Regards, Twinkle
Re: Spark Streaming to Kafka
Thanks Saisai. On Wed, May 20, 2015 at 11:23 AM, Saisai Shao sai.sai.s...@gmail.com wrote: I think here is the PR https://github.com/apache/spark/pull/2994 you could refer to. 2015-05-20 13:41 GMT+08:00 twinkle sachdeva twinkle.sachd...@gmail.com: Hi, As Spark streaming is being nicely integrated with consuming messages from Kafka, so I thought of asking the forum, that is there any implementation available for pushing data to Kafka from Spark Streaming too? Any link(s) will be helpful. Thanks and Regards, Twinkle
Spark Streaming to Kafka
Hi, As Spark streaming is being nicely integrated with consuming messages from Kafka, so I thought of asking the forum, that is there any implementation available for pushing data to Kafka from Spark Streaming too? Any link(s) will be helpful. Thanks and Regards, Twinkle
Re: [Spark Streaming] Iterative programming on an ordered spark stream using Java?
Hi, UpdateStateByKey : if you can brief the issue you are facing with this,that will be great. Regarding not keeping whole dataset in memory, you can tweak the parameter of remember, such that it does checkpoint at appropriate time. Thanks Twinkle On Thursday, June 18, 2015, Nipun Arora nipunarora2...@gmail.com wrote: Hi All, I am updating my question so that I give more detail. I have also created a stackexchange question: http://stackoverflow.com/questions/30904244/iterative-programming-on-an-ordered-spark-stream-using-java-in-spark-streaming Is there anyway in spark streaming to keep data across multiple micro-batches of a sorted dstream, where the stream is sorted using timestamps? (Assuming monotonically arriving data) Can anyone make suggestions on how to keep data across iterations where each iteration is an RDD being processed in JavaDStream? *What does iteration mean?* I first sort the dstream using timestamps, assuming that data has arrived in a monotonically increasing timestamp (no out-of-order). I need a global HashMap X, which I would like to be updated using values with timestamp t1, and then subsequently t1+1. Since the state of X itself impacts the calculations it needs to be a linear operation. Hence operation at t1+1 depends on HashMap X, which depends on data at and before t1. *Application* This is especially the case when one is trying to update a model or compare two sets of RDD's, or keep a global history of certain events etc which will impact operations in future iterations? I would like to keep some accumulated history to make calculations.. not the entire dataset, but persist certain events which can be used in future DStream RDDs? Thanks Nipun On Wed, Jun 17, 2015 at 11:15 PM, Nipun Arora nipunarora2...@gmail.com javascript:_e(%7B%7D,'cvml','nipunarora2...@gmail.com'); wrote: Hi Silvio, Thanks for your response. I should clarify. I would like to do updates on a structure iteratively. I am not sure if updateStateByKey meets my criteria. In the current situation, I can run some map reduce tasks and generate a JavaPairDStreamKey,Value, after this my algorithm is necessarily sequential ... i.e. I have sorted the data using the timestamp(within the messages), and I would like to iterate over it, and maintain a state where I can update a model. I tried using foreach/foreachRDD, and collect to do this, but I can't seem to propagate values across microbatches/RDD's. Any suggestions? Thanks Nipun On Wed, Jun 17, 2015 at 10:52 PM, Silvio Fiorito silvio.fior...@granturing.com javascript:_e(%7B%7D,'cvml','silvio.fior...@granturing.com'); wrote: Hi, just answered in your other thread as well... Depending on your requirements, you can look at the updateStateByKey API From: Nipun Arora Date: Wednesday, June 17, 2015 at 10:51 PM To: user@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org'); Subject: Iterative Programming by keeping data across micro-batches in spark-streaming? Hi, Is there anyway in spark streaming to keep data across multiple micro-batches? Like in a HashMap or something? Can anyone make suggestions on how to keep data across iterations where each iteration is an RDD being processed in JavaDStream? This is especially the case when I am trying to update a model or compare two sets of RDD's, or keep a global history of certain events etc which will impact operations in future iterations? I would like to keep some accumulated history to make calculations.. not the entire dataset, but persist certain events which can be used in future JavaDStream RDDs? Thanks Nipun