Need sth like "def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = ???"
Hi, In our use case of using the groupByKey(...): RDD[(K, Iterable[V]], there might be a case that even for a single key (an extreme case though), the associated Iterable[V] could resulting in OOM. Is it possible to provide the above 'groupByKeyWithRDD'? And, ideally, it would be great if the internal impl of the RDD[V] is smart enough to only spill the data into disk upon a configured threshold. That way, we won't sacrifice the performance for the normal cases as well. Any suggestions/comments are welcomed. Thanks a lot! Just a side note: we do understand the points mentioned here: https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html, and the 'reduceByKey', 'foldByKey' don't quite fit our needs right now, that is to say, we couldn't really avoid 'groupByKey'. -- ChuChao
Re: Spark ClosureCleaner or java serializer OOM when trying to grow
Confirming that I am also hitting the same errors. host: r3.8xlarge configuration spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.memory 200g spark.serializer.objectStreamReset 10 spark.local.dir /mnt/rohanp-data2/ spark.driver.maxResultSize 0 Error 15/11/14 08:03:49 INFO BlockManagerInfo: Added broadcast_5_piece32 in memory on localhost:56741 (size: 1728.1 KB, free: 103.1 GB) [29/1049] 15/11/14 08:03:49 INFO SparkContext: Created broadcast 5 from broadcast at Word2Vec.scala:286 Exception in thread "main" java.lang.OutOfMemoryError at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123) at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117) at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876) at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:288) at Word2VecApp$.main(Word2VecApp.scala:13) at Word2VecApp.main(Word2VecApp.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/11/14 08:04:21 INFO SparkContext: Invoking stop() from shutdown hook 15/11/14 08:04:21 INFO SparkUI: Stopped Spark web UI at http://10.144.64.249:4040 15/11/14 08:04:21 INFO DAGScheduler: Stopping DAGScheduler Code : import org.apache.spark._ import org.apache.spark.rdd._ import org.apache.spark.SparkContext._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} import org.apache.spark.SparkConf object Word2VecApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Word2Vec Application") val sc = new SparkContext(conf) val input = sc.textFile("/mnt/rohanp-data1/data/training_queries.txt").map(line => line.split(" ").toSeq) val word2vec = new Word2Vec() val model = word2vec.fit(input) val synonyms = model.findSynonyms("china", 40) for((synonym, cosineSimilarity) <- synonyms) { println(s"$synonym $cosineSimilarity") } } } start command: ../spark-submit --class "Word2VecApp" --master local[30] target/scala-2.10/word2vec-project_2.10-1.0.jar -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-tp24796p25383.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka Offsets after application is restarted using Spark Streaming Checkpointing
Hi Cody , Thanks for the clarification. I will try to come up with some workaround. I have an another doubt. When my job is restarted, and recovers from the checkpoint it does the re-partitioning step twice for each 15 minute job until the window of 2 hours is complete. Then the re-partitioning takes place only once. For eg - When the job recovers at 16:15 it does re-partitioning for the 16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other intermediate stages are computed for 10:00 batch. I am using reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is complete i.e at 18:15 repartitioning takes place only once. Seems like the checkpoint does not have rdd stored for beyond 2 hrs which is my window duration. Because of this my job takes more time than usual. Is there a way or some configuration parameter which would help avoid repartitioning twice ? I am attaching the snapshot for the same. Thanks !! Kundan On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeningerwrote: > Unless you change maxRatePerPartition, a batch is going to contain all of > the offsets from the last known processed to the highest available. > > Offsets are not time-based, and Kafka's time-based api currently has very > poor granularity (it's based on filesystem timestamp of the log segment). > There's a kafka improvement proposal to add time-based indexing, but I > wouldn't expect it soon. > > Basically, if you want batches to relate to time even while your spark job > is down, you need an external process to index Kafka and do some custom > work to use that index to generate batches. > > Or (preferably) embed a time in your message, and do any time-based > calculations using that time, not time of processing. > > On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar > wrote: > >> Hi, >> >> I am using spark streaming check-pointing mechanism and reading the data >> from kafka. The window duration for my application is 2 hrs with a sliding >> interval of 15 minutes. >> >> So, my batches run at following intervals... >> 09:45 >> 10:00 >> 10:15 >> 10:30 and so on >> >> Suppose, my running batch dies at 09:55 and I restart the application at >> 12:05, then the flow is something like >> >> At 12:05 it would run the 10:00 batch -> would this read the kafka >> offsets from the time it went down (or 9:45) to 12:00 ? or just upto >> 10:10 ? >> then next would 10:15 batch - what would be the offsets as input for this >> batch ? ...so on for all the queued batches >> >> >> Basically, my requirement is such that when the application is restarted >> at 12:05 then it should read the kafka offsets till 10:00 and then the >> next queued batch takes offsets from 10:00 to 10:15 and so on until all the >> queued batches are processed. >> >> If this is the way offsets are handled for all the queued batched and I >> am fine. >> >> Or else please provide suggestions on how this can be done. >> >> >> >> Thanks!!! >> >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.4 GC issue
I have tried with G1 GC .Please if anyone can provide their setting for GC. At code level I am : 1.reading orc table usind dataframe 2.map df to rdd of my case class 3. changed that rdd to paired rdd 4.Applied combineByKey 5. saving the result to orc file Please suggest Regards, Renu Yadav On Fri, Nov 13, 2015 at 8:01 PM, Renu Yadavwrote: > am using spark 1.4 and my application is taking much time in GC around > 60-70% of time for each task > > I am using parallel GC. > please help somebody as soon as possible. > > Thanks, > Renu >
Re: Very slow startup for jobs containing millions of tasks
Which release are you using ? If older than 1.5.0, you miss some fixes such as SPARK-9952 Cheers On Sat, Nov 14, 2015 at 6:35 PM, Jerry Lamwrote: > Hi spark users and developers, > > Have anyone experience the slow startup of a job when it contains a stage > with over 4 millions of tasks? > The job has been pending for 1.4 hours without doing anything (please > refer to the attached pictures). However, the driver is busy doing > something. jstack the driver and I found the following relevant: > > ``` > "dag-scheduler-event-loop" daemon prio=10 tid=0x7f24a8c59800 nid=0x454 > runnable [0x7f23b3e29000] >java.lang.Thread.State: RUNNABLE > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231) > at > org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1399) > at > org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1373) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:911) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:910) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:910) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:834) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:837) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:836) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:836) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:818) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1453) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > ``` > > It seems that it takes long time for the driver to create/schedule the DAG > for that many tasks. Is there a way to speed it up? > > Best Regards, > > Jerry > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
Re: Very slow startup for jobs containing millions of tasks
Hi Ted, That looks exactly what happens. It has been 5 hrs now. The code was built for 1.4. Thank you very much! Best Regards, Jerry Sent from my iPhone > On 14 Nov, 2015, at 11:21 pm, Ted Yuwrote: > > Which release are you using ? > If older than 1.5.0, you miss some fixes such as SPARK-9952 > > Cheers > >> On Sat, Nov 14, 2015 at 6:35 PM, Jerry Lam wrote: >> Hi spark users and developers, >> >> Have anyone experience the slow startup of a job when it contains a stage >> with over 4 millions of tasks? >> The job has been pending for 1.4 hours without doing anything (please refer >> to the attached pictures). However, the driver is busy doing something. >> jstack the driver and I found the following relevant: >> >> ``` >> "dag-scheduler-event-loop" daemon prio=10 tid=0x7f24a8c59800 nid=0x454 >> runnable [0x7f23b3e29000] >>java.lang.Thread.State: RUNNABLE >> at >> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231) >> at >> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231) >> at scala.Option.getOrElse(Option.scala:120) >> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230) >> at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1399) >> at >> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1373) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:911) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:910) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at scala.collection.AbstractTraversable.map(Traversable.scala:105) >> at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:910) >> at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:834) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:837) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:836) >> at scala.collection.immutable.List.foreach(List.scala:318) >> at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:836) >> at >> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:818) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1453) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> ``` >> >> It seems that it takes long time for the driver to create/schedule the DAG >> for that many tasks. Is there a way to speed it up? >> >> Best Regards, >> >> Jerry >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >
Calculating Timeseries Aggregation
Hi, I am working on requirement of calculating real time metrics and building prototype on Spark streaming. I need to build aggregate at Seconds, Minutes, Hours and Day level. I am not sure whether I should calculate all these aggregates as different Windowed function on input DStream or shall I use updateStateByKey function for the same. If I have to use updateStateByKey for these time series aggregation, how can I remove keys from the state after different time lapsed? Please suggest. Regards SM - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL: filter if column substring does not contain a string
I'm using pyspark 1.3.0, and struggling with what should be simple. Basically, I'd like to run this: site_logs.filter(lambda r: 'page_row' in r.request[:20]) meaning that I want to keep rows that have 'page_row' in the first 20 characters of the request column. The following is the closest I've come up with: pages = site_logs.filter("request like '%page_row%'") but that's missing the [:20] part. If I instead try the .like function from the Column API: birf.filter(birf.request.like('bi_page')).take(5) I get... Py4JJavaError: An error occurred while calling o71.filter. : org.apache.spark.sql.AnalysisException: resolved attributes request missing from user_agent,status_code,log_year,bytes,log_month,request,referrer What is the code to run this filter, and what are some recommended ways to learn the Spark SQL syntax? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-filter-if-column-substring-does-not-contain-a-string-tp25385.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: send transformed RDD to s3 from slaves
Hi Walrus, Try caching the results just before calling the rdd.count. Regards, Ajay > On Nov 13, 2015, at 7:56 PM, Walrus theCatwrote: > > Hi, > > I have an RDD which crashes the driver when being collected. I want to send > the data on its partitions out to S3 without bringing it back to the driver. > I try calling rdd.foreachPartition, but the data that gets sent has not gone > through the chain of transformations that I need. It's the data as it was > ingested initially. After specifying my chain of transformations, but before > calling foreachPartition, I call rdd.count in order to force the RDD to > transform. The data it sends out is still not transformed. How do I get the > RDD to send out transformed data when calling foreachPartition? > > Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: very slow parquet file write
How are you writing it out? Can you post some code? Regards Sab On 14-Nov-2015 5:21 am, "Rok Roskar"wrote: > I'm not sure what you mean? I didn't do anything specifically to partition > the columns > On Nov 14, 2015 00:38, "Davies Liu" wrote: > >> Do you have partitioned columns? >> >> On Thu, Nov 5, 2015 at 2:08 AM, Rok Roskar wrote: >> > I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions >> into a >> > parquet file on HDFS. I've got a few hundred nodes in the cluster, so >> for >> > the size of file this is way over-provisioned (I've tried it with fewer >> > partitions and fewer nodes, no obvious effect). I was expecting the >> dump to >> > disk to be very fast -- the DataFrame is cached in memory and contains >> just >> > 14 columns (13 are floats and one is a string). When I write it out in >> json >> > format, this is indeed reasonably fast (though it still takes a few >> minutes, >> > which is longer than I would expect). >> > >> > However, when I try to write a parquet file it takes way longer -- the >> first >> > set of tasks finishes in a few minutes, but the subsequent tasks take >> more >> > than twice as long or longer. In the end it takes over half an hour to >> write >> > the file. I've looked at the disk I/O and cpu usage on the compute >> nodes and >> > it looks like the processors are fully loaded while the disk I/O is >> > essentially zero for long periods of time. I don't see any obvious >> garbage >> > collection issues and there are no problems with memory. >> > >> > Any ideas on how to debug/fix this? >> > >> > Thanks! >> > >> > >> >
Re: Spark and Spring Integrations
You are probably trying to access the spring context from the executors after initializing it at the driver. And running into serialization issues. You could instead use mapPartitions() and initialize the spring context from within that. That said I don't think that will solve all of your issues because you won't be able to use the other rich transformations in Spark. I am afraid these two don't gel that well, unless and otherwise all your context lookups for beans happen in the driver. Regards Sab On 13-Nov-2015 4:17 pm, "Netai Biswas"wrote: > Hi, > > I am facing issue while integrating spark with spring. > > I am getting "java.lang.IllegalStateException: Cannot deserialize > BeanFactory with id" errors for all beans. I have tried few solutions > available in web. Please help me out to solve this issue. > > Few details: > > Java : 8 > Spark : 1.5.1 > Spring : 3.2.9.RELEASE > > Please let me know if you need more information or any sample code. > > Thanks, > Netai > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark and Spring Integrations
You could try to use akka actor system with apache spark, if you are intending to use it in online / interactive job execution scenario. On Sat, Nov 14, 2015, 08:19 Sabarish Sasidharan < sabarish.sasidha...@manthan.com> wrote: > You are probably trying to access the spring context from the executors > after initializing it at the driver. And running into serialization issues. > > You could instead use mapPartitions() and initialize the spring context > from within that. > > That said I don't think that will solve all of your issues because you > won't be able to use the other rich transformations in Spark. > > I am afraid these two don't gel that well, unless and otherwise all your > context lookups for beans happen in the driver. > > Regards > Sab > On 13-Nov-2015 4:17 pm, "Netai Biswas"wrote: > >> Hi, >> >> I am facing issue while integrating spark with spring. >> >> I am getting "java.lang.IllegalStateException: Cannot deserialize >> BeanFactory with id" errors for all beans. I have tried few solutions >> available in web. Please help me out to solve this issue. >> >> Few details: >> >> Java : 8 >> Spark : 1.5.1 >> Spring : 3.2.9.RELEASE >> >> Please let me know if you need more information or any sample code. >> >> Thanks, >> Netai >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >>
Re: send transformed RDD to s3 from slaves
Maybe you want to be using rdd.saveAsTextFile() ? > On Nov 13, 2015, at 4:56 PM, Walrus theCatwrote: > > Hi, > > I have an RDD which crashes the driver when being collected. I want to send > the data on its partitions out to S3 without bringing it back to the driver. > I try calling rdd.foreachPartition, but the data that gets sent has not gone > through the chain of transformations that I need. It's the data as it was > ingested initially. After specifying my chain of transformations, but before > calling foreachPartition, I call rdd.count in order to force the RDD to > transform. The data it sends out is still not transformed. How do I get the > RDD to send out transformed data when calling foreachPartition? > > Thanks - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark job stuck with 0 input records
Hello, We are running spark on yarn version 1.4.1 java.vendor=Oracle Corporation java.runtime.version=1.7.0_40-b43 datanucleus-core-3.2.10.jar datanucleus-api-jdo-3.2.6.jar datanucleus-rdbms-3.2.9.jar IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDuration ▾GC TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors1672060 RUNNINGRACK_LOCAL56 / foo1.net2015/11/14 15:28:565.1 h0.0 B (hadoop) / 00.0 B / 0 IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC TimeInput Size / Records ▴Write TimeShuffle Write Size / RecordsErrors130176 0RUNNINGRACK_LOCAL19 / foo2.net2015/11/14 03:26:2216.8 h82 ms0.0 B (hadoop) / 16592040.0 B / 0 Our spark jobs have been running fine till now, suddenly we saw some lone executors which got 0 records to process, got stuck indefinitely. We killed some jobs which ran for 16+ hours. This seems like a spark bug, is anyone aware of any issue in this version of spark?