Need sth like "def groupByKeyWithRDD(partitioner: Partitioner): RDD[(K, RDD[V])] = ???"

2015-11-14 Thread chao chu
Hi,

In our use case of using the groupByKey(...): RDD[(K, Iterable[V]], there
might be a case that even for a single key (an extreme case though), the
associated Iterable[V] could resulting in OOM.

Is it possible to provide the above 'groupByKeyWithRDD'?

And, ideally, it would be great if the internal impl of the RDD[V] is smart
enough to only spill the data into disk upon a configured threshold. That
way, we won't sacrifice the performance for the normal cases as well.

Any suggestions/comments are welcomed. Thanks a lot!

Just a side note: we do understand the points mentioned here:
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html,
and the 'reduceByKey', 'foldByKey' don't quite fit our needs right now,
that is to say, we couldn't really avoid 'groupByKey'.

-- 
ChuChao


Re: Spark ClosureCleaner or java serializer OOM when trying to grow

2015-11-14 Thread rohangpatil
Confirming that I am also hitting the same errors.

host: r3.8xlarge

configuration 

spark.serializer org.apache.spark.serializer.KryoSerializer 
spark.driver.memory 200g
spark.serializer.objectStreamReset 10   
spark.local.dir /mnt/rohanp-data2/  
spark.driver.maxResultSize 0

Error

15/11/14 08:03:49 INFO BlockManagerInfo: Added broadcast_5_piece32 in memory
on localhost:56741 (size: 1728.1 KB, free: 103.1 GB)   
[29/1049]
15/11/14 08:03:49 INFO SparkContext: Created broadcast 5 from broadcast at
Word2Vec.scala:286
Exception in thread "main" java.lang.OutOfMemoryError
at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
at org.apache.spark.mllib.feature.Word2Vec.fit(Word2Vec.scala:288)
at Word2VecApp$.main(Word2VecApp.scala:13)
at Word2VecApp.main(Word2VecApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/11/14 08:04:21 INFO SparkContext: Invoking stop() from shutdown hook
15/11/14 08:04:21 INFO SparkUI: Stopped Spark web UI at
http://10.144.64.249:4040
15/11/14 08:04:21 INFO DAGScheduler: Stopping DAGScheduler

Code :

import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import org.apache.spark.SparkConf

object Word2VecApp {
  def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Word2Vec Application")
val sc = new SparkContext(conf)
val input =
sc.textFile("/mnt/rohanp-data1/data/training_queries.txt").map(line =>
line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("china", 40)
for((synonym, cosineSimilarity) <- synonyms) {
  println(s"$synonym $cosineSimilarity")
}
  }
}

start command:
../spark-submit   --class "Word2VecApp"   --master local[30]  
target/scala-2.10/word2vec-project_2.10-1.0.jar



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ClosureCleaner-or-java-serializer-OOM-when-trying-to-grow-tp24796p25383.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Offsets after application is restarted using Spark Streaming Checkpointing

2015-11-14 Thread kundan kumar
Hi Cody ,

Thanks for the clarification. I will try to come up with some workaround.

I have an another doubt. When my job is restarted, and recovers from the
checkpoint it does the re-partitioning step twice for each 15 minute job
until the window of 2 hours is complete. Then the re-partitioning  takes
place only once.

For eg - When the job recovers at 16:15 it does re-partitioning for the
16:15 kafka stream and the 14:15 kafka stream as well. Also, all the other
intermediate stages are computed for 10:00 batch. I am using
reduceByKeyAndWindow with inverse function. Now once the 2 hrs window is
complete i.e at 18:15 repartitioning takes place only once. Seems like the
checkpoint does not have rdd stored for beyond 2 hrs which is my window
duration.  Because of this my job takes more time than usual.

Is there a way or some configuration parameter which would help avoid
repartitioning twice ?

I am attaching the snapshot for the same.

Thanks !!
Kundan

On Fri, Nov 13, 2015 at 8:48 PM, Cody Koeninger  wrote:

> Unless you change maxRatePerPartition, a batch is going to contain all of
> the offsets from the last known processed to the highest available.
>
> Offsets are not time-based, and Kafka's time-based api currently has very
> poor granularity (it's based on filesystem timestamp of the log segment).
> There's a kafka improvement proposal to add time-based indexing, but I
> wouldn't expect it soon.
>
> Basically, if you want batches to relate to time even while your spark job
> is down, you need an external process to index Kafka and do some custom
> work to use that index to generate batches.
>
> Or (preferably) embed a time in your message, and do any time-based
> calculations using that time, not time of processing.
>
> On Fri, Nov 13, 2015 at 4:36 AM, kundan kumar 
> wrote:
>
>> Hi,
>>
>> I am using spark streaming check-pointing mechanism and reading the data
>> from kafka. The window duration for my application is 2 hrs with a sliding
>> interval of 15 minutes.
>>
>> So, my batches run at following intervals...
>> 09:45
>> 10:00
>> 10:15
>> 10:30  and so on
>>
>> Suppose, my running batch dies at 09:55 and I restart the application at
>> 12:05, then the flow is something like
>>
>> At 12:05 it would run the 10:00 batch -> would this read the kafka
>> offsets from the time it went down (or 9:45)  to 12:00 ? or  just upto
>> 10:10 ?
>> then next would 10:15 batch - what would be the offsets as input for this
>> batch ? ...so on for all the queued batches
>>
>>
>> Basically, my requirement is such that when the application is restarted
>> at 12:05 then it should read the kafka offsets till 10:00  and then the
>> next queued batch takes offsets from 10:00 to 10:15 and so on until all the
>> queued batches are processed.
>>
>> If this is the way offsets are handled for all the queued batched and I
>> am fine.
>>
>> Or else please provide suggestions on how this can be done.
>>
>>
>>
>> Thanks!!!
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: spark 1.4 GC issue

2015-11-14 Thread Renu Yadav
I have tried with G1 GC .Please if anyone can provide their setting for GC.
At code level I am :
1.reading orc table usind dataframe
2.map df to rdd of my case class
3. changed that rdd to paired rdd
4.Applied combineByKey
5. saving the result to orc file

Please suggest

Regards,
Renu Yadav

On Fri, Nov 13, 2015 at 8:01 PM, Renu Yadav  wrote:

> am using spark 1.4 and my application is taking much time in GC around
> 60-70% of time for each task
>
> I am using parallel GC.
> please help somebody as soon as possible.
>
> Thanks,
> Renu
>


Re: Very slow startup for jobs containing millions of tasks

2015-11-14 Thread Ted Yu
Which release are you using ?
If older than 1.5.0, you miss some fixes such as SPARK-9952

Cheers

On Sat, Nov 14, 2015 at 6:35 PM, Jerry Lam  wrote:

> Hi spark users and developers,
>
> Have anyone experience the slow startup of a job when it contains a stage
> with over 4 millions of tasks?
> The job has been pending for 1.4 hours without doing anything (please
> refer to the attached pictures). However, the driver is busy doing
> something. jstack the driver and I found the following relevant:
>
> ```
> "dag-scheduler-event-loop" daemon prio=10 tid=0x7f24a8c59800 nid=0x454
> runnable [0x7f23b3e29000]
>java.lang.Thread.State: RUNNABLE
> at
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
> at
> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1399)
> at
> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1373)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:911)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:910)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:910)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:834)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:837)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:836)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:836)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:818)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1453)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> ```
>
> It seems that it takes long time for the driver to create/schedule the DAG
> for that many tasks. Is there a way to speed it up?
>
> Best Regards,
>
> Jerry
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Very slow startup for jobs containing millions of tasks

2015-11-14 Thread Jerry Lam
Hi Ted, 

That looks exactly what happens. It has been 5 hrs now. The code was built for 
1.4. Thank you very much! 

Best Regards,

Jerry

Sent from my iPhone

> On 14 Nov, 2015, at 11:21 pm, Ted Yu  wrote:
> 
> Which release are you using ?
> If older than 1.5.0, you miss some fixes such as SPARK-9952
> 
> Cheers
> 
>> On Sat, Nov 14, 2015 at 6:35 PM, Jerry Lam  wrote:
>> Hi spark users and developers,
>> 
>> Have anyone experience the slow startup of a job when it contains a stage 
>> with over 4 millions of tasks? 
>> The job has been pending for 1.4 hours without doing anything (please refer 
>> to the attached pictures). However, the driver is busy doing something. 
>> jstack the driver and I found the following relevant:
>> 
>> ```
>> "dag-scheduler-event-loop" daemon prio=10 tid=0x7f24a8c59800 nid=0x454 
>> runnable [0x7f23b3e29000]
>>java.lang.Thread.State: RUNNABLE
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1399)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1373)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:911)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:910)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at 
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:910)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:834)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:837)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:836)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:836)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:818)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1453)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> ```
>> 
>> It seems that it takes long time for the driver to create/schedule the DAG 
>> for that many tasks. Is there a way to speed it up? 
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Calculating Timeseries Aggregation

2015-11-14 Thread Sandip Mehta
Hi,

I am working on requirement of calculating real time metrics and building 
prototype  on Spark streaming. I need to build aggregate at Seconds, Minutes, 
Hours and Day level. 

I am not sure whether I should calculate all these aggregates as  different 
Windowed function on input DStream or shall I use updateStateByKey function for 
the same. If I have to use updateStateByKey for these time series aggregation, 
how can I remove keys from the state after different time lapsed?

Please suggest.

Regards
SM
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark SQL: filter if column substring does not contain a string

2015-11-14 Thread YaoPau
I'm using pyspark 1.3.0, and struggling with what should be simple. 
Basically, I'd like to run this:

site_logs.filter(lambda r: 'page_row' in r.request[:20])

meaning that I want to keep rows that have 'page_row' in the first 20
characters of the request column.  The following is the closest I've come up
with:

pages = site_logs.filter("request like '%page_row%'")

but that's missing the [:20] part.  If I instead try the .like function from
the Column API:

birf.filter(birf.request.like('bi_page')).take(5)

I get... Py4JJavaError: An error occurred while calling o71.filter.
: org.apache.spark.sql.AnalysisException: resolved attributes request
missing from
user_agent,status_code,log_year,bytes,log_month,request,referrer


What is the code to run this filter, and what are some recommended ways to
learn the Spark SQL syntax?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-filter-if-column-substring-does-not-contain-a-string-tp25385.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: send transformed RDD to s3 from slaves

2015-11-14 Thread Ajay
Hi Walrus,

Try caching the results just before calling the rdd.count.

Regards,
Ajay

> On Nov 13, 2015, at 7:56 PM, Walrus theCat  wrote:
> 
> Hi,
> 
> I have an RDD which crashes the driver when being collected.  I want to send 
> the data on its partitions out to S3 without bringing it back to the driver. 
> I try calling rdd.foreachPartition, but the data that gets sent has not gone 
> through the chain of transformations that I need.  It's the data as it was 
> ingested initially.  After specifying my chain of transformations, but before 
> calling foreachPartition, I call rdd.count in order to force the RDD to 
> transform.  The data it sends out is still not transformed.  How do I get the 
> RDD to send out transformed data when calling foreachPartition?
> 
> Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: very slow parquet file write

2015-11-14 Thread Sabarish Sasidharan
How are you writing it out? Can you post some code?

Regards
Sab
On 14-Nov-2015 5:21 am, "Rok Roskar"  wrote:

> I'm not sure what you mean? I didn't do anything specifically to partition
> the columns
> On Nov 14, 2015 00:38, "Davies Liu"  wrote:
>
>> Do you have partitioned columns?
>>
>> On Thu, Nov 5, 2015 at 2:08 AM, Rok Roskar  wrote:
>> > I'm writing a ~100 Gb pyspark DataFrame with a few hundred partitions
>> into a
>> > parquet file on HDFS. I've got a few hundred nodes in the cluster, so
>> for
>> > the size of file this is way over-provisioned (I've tried it with fewer
>> > partitions and fewer nodes, no obvious effect). I was expecting the
>> dump to
>> > disk to be very fast -- the DataFrame is cached in memory and contains
>> just
>> > 14 columns (13 are floats and one is a string). When I write it out in
>> json
>> > format, this is indeed reasonably fast (though it still takes a few
>> minutes,
>> > which is longer than I would expect).
>> >
>> > However, when I try to write a parquet file it takes way longer -- the
>> first
>> > set of tasks finishes in a few minutes, but the subsequent tasks take
>> more
>> > than twice as long or longer. In the end it takes over half an hour to
>> write
>> > the file. I've looked at the disk I/O and cpu usage on the compute
>> nodes and
>> > it looks like the processors are fully loaded while the disk I/O is
>> > essentially zero for long periods of time. I don't see any obvious
>> garbage
>> > collection issues and there are no problems with memory.
>> >
>> > Any ideas on how to debug/fix this?
>> >
>> > Thanks!
>> >
>> >
>>
>


Re: Spark and Spring Integrations

2015-11-14 Thread Sabarish Sasidharan
You are probably trying to access the spring context from the executors
after initializing it at the driver. And running into serialization issues.

You could instead use mapPartitions() and initialize the spring context
from within that.

That said I don't think that will solve all of your issues because you
won't be able to use the other rich transformations in Spark.

I am afraid these two don't gel that well, unless and otherwise all your
context lookups for beans happen in the driver.

Regards
Sab
On 13-Nov-2015 4:17 pm, "Netai Biswas"  wrote:

> Hi,
>
> I am facing issue while integrating spark with spring.
>
> I am getting "java.lang.IllegalStateException: Cannot deserialize
> BeanFactory with id" errors for all beans. I have tried few solutions
> available in web. Please help me out to solve this issue.
>
> Few details:
>
> Java : 8
> Spark : 1.5.1
> Spring : 3.2.9.RELEASE
>
> Please let me know if you need more information or any sample code.
>
> Thanks,
> Netai
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark and Spring Integrations

2015-11-14 Thread Muthu Jayakumar
You could try to use akka actor system with apache spark, if you are
intending to use it in online / interactive job execution scenario.

On Sat, Nov 14, 2015, 08:19 Sabarish Sasidharan <
sabarish.sasidha...@manthan.com> wrote:

> You are probably trying to access the spring context from the executors
> after initializing it at the driver. And running into serialization issues.
>
> You could instead use mapPartitions() and initialize the spring context
> from within that.
>
> That said I don't think that will solve all of your issues because you
> won't be able to use the other rich transformations in Spark.
>
> I am afraid these two don't gel that well, unless and otherwise all your
> context lookups for beans happen in the driver.
>
> Regards
> Sab
> On 13-Nov-2015 4:17 pm, "Netai Biswas"  wrote:
>
>> Hi,
>>
>> I am facing issue while integrating spark with spring.
>>
>> I am getting "java.lang.IllegalStateException: Cannot deserialize
>> BeanFactory with id" errors for all beans. I have tried few solutions
>> available in web. Please help me out to solve this issue.
>>
>> Few details:
>>
>> Java : 8
>> Spark : 1.5.1
>> Spring : 3.2.9.RELEASE
>>
>> Please let me know if you need more information or any sample code.
>>
>> Thanks,
>> Netai
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Spring-Integrations-tp25375.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: send transformed RDD to s3 from slaves

2015-11-14 Thread Andrew Ehrlich
Maybe you want to be using rdd.saveAsTextFile() ?

> On Nov 13, 2015, at 4:56 PM, Walrus theCat  wrote:
> 
> Hi,
> 
> I have an RDD which crashes the driver when being collected.  I want to send 
> the data on its partitions out to S3 without bringing it back to the driver. 
> I try calling rdd.foreachPartition, but the data that gets sent has not gone 
> through the chain of transformations that I need.  It's the data as it was 
> ingested initially.  After specifying my chain of transformations, but before 
> calling foreachPartition, I call rdd.count in order to force the RDD to 
> transform.  The data it sends out is still not transformed.  How do I get the 
> RDD to send out transformed data when calling foreachPartition?
> 
> Thanks



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark job stuck with 0 input records

2015-11-14 Thread pratik khadloya
Hello,

We are running spark on yarn version 1.4.1
java.vendor=Oracle Corporation
java.runtime.version=1.7.0_40-b43
datanucleus-core-3.2.10.jar
datanucleus-api-jdo-3.2.6.jar
datanucleus-rdbms-3.2.9.jar

IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDuration ▾GC
TimeInput Size / RecordsWrite TimeShuffle Write Size / RecordsErrors1672060
RUNNINGRACK_LOCAL56 / foo1.net2015/11/14 15:28:565.1 h0.0 B (hadoop) / 00.0
B / 0

IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC
TimeInput Size / Records ▴Write TimeShuffle Write Size / RecordsErrors130176
0RUNNINGRACK_LOCAL19 / foo2.net2015/11/14 03:26:2216.8 h82 ms0.0 B (hadoop)
/ 16592040.0 B / 0


Our spark jobs have been running fine till now, suddenly we saw some lone
executors which got 0 records to process, got stuck indefinitely. We killed
some jobs which ran for 16+ hours.

This seems like a spark bug, is anyone aware of any issue in this version
of spark?