Re: listening to recursive folder structures in s3 using pyspark streaming (textFileStream)

2016-02-17 Thread Shixiong(Ryan) Zhu
textFileStream doesn't support that. It only supports monitoring one folder. On Wed, Feb 17, 2016 at 7:20 AM, in4maniac wrote: > Hi all, > > I am new to pyspark streaming and I was following a tutorial I saw in the > internet > ( >

Re: SparkListener onApplicationEnd processing an RDD throws exception because of stopped SparkContext

2016-02-17 Thread Shixiong(Ryan) Zhu
`onApplicationEnd` is posted when SparkContext is stopping, and you cannot submit any job to a stopping SparkContext. In general, SparkListener is used to monitor the job progress and collect job information, an you should not submit jobs there. Why not submit your jobs in the main thread? On

Re: Skip empty batches - spark streaming

2016-02-11 Thread Shixiong(Ryan) Zhu
Are you using a custom input dstream? If so, you can make the `compute` method return None to skip a batch. On Thu, Feb 11, 2016 at 1:03 PM, Sebastian Piu wrote: > I was wondering if there is there any way to skip batches with zero events > when streaming? > By skip I

Re: Skip empty batches - spark streaming

2016-02-11 Thread Shixiong(Ryan) Zhu
is behaviour > > Thanks! > On 11 Feb 2016 9:07 p.m., "Shixiong(Ryan) Zhu" <shixi...@databricks.com> > wrote: > >> Are you using a custom input dstream? If so, you can make the `compute` >> method return None to skip a batch. >> >> On Thu, Feb 11

Re: Spark Streaming : Limiting number of receivers per executor

2016-02-10 Thread Shixiong(Ryan) Zhu
You can't. The number of cores must be great than the number of receivers. On Wed, Feb 10, 2016 at 2:34 AM, ajay garg wrote: > Hi All, > I am running 3 executors in my spark streaming application with 3 > cores per executors. I have written my custom receiver

Re: [Spark 1.5+] ReceiverTracker seems not to stop Kinesis receivers

2016-02-09 Thread Shixiong(Ryan) Zhu
Could you do a thread dump in the executor that runs the Kinesis receiver and post it? It would be great if you can provide the executor log as well? On Tue, Feb 9, 2016 at 3:14 PM, Roberto Coluccio wrote: > Hello, > > can anybody kindly help me out a little bit

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
tion.Iterator$$anon$ > org.apache.spark.InterruptibleIterator# > scala.collection.IndexedSeqLike$Elements# > scala.collection.mutable.ArrayOps$ofRef# > java.lang.Object[]# > > > > > On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrot

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
Hey Udo, mapWithState usually uses much more memory than updateStateByKey since it caches the states in memory. However, from your description, looks BlockGenerator cannot push data into BlockManager, there may be something wrong in BlockGenerator. Could you share the top 50 objects in the heap

Re: PairDStreamFunctions.mapWithState fails in case timeout is set without updating State[S]

2016-02-04 Thread Shixiong(Ryan) Zhu
Thanks for reporting it. I will take a look. On Thu, Feb 4, 2016 at 6:56 AM, Yuval.Itzchakov wrote: > Hi, > I've been playing with the expiramental PairDStreamFunctions.mapWithState > feature and I've seem to have stumbled across a bug, and was wondering if > anyone else has

Re: local class incompatible: stream classdesc serialVersionUID

2016-02-01 Thread Shixiong(Ryan) Zhu
I guess he used client model and the local Spark version is 1.5.2 but the standalone Spark version is 1.5.1. In other words, he used a 1.5.2 driver to talk with 1.5.1 executors. On Mon, Feb 1, 2016 at 2:08 PM, Holden Karau wrote: > So I'm a little confused to exactly how

Re: Spark Streaming from existing RDD

2016-01-29 Thread Shixiong(Ryan) Zhu
Do you just want to write some unit tests? If so, you can use "queueStream" to create a DStream from a queue of RDDs. However, because it doesn't support metadata checkpointing, it's better to only use it in unit tests. On Fri, Jan 29, 2016 at 7:35 AM, Sateesh Karuturi <

Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
hat all threads inside the worker clean up the >> ThreadLocal once they are done with processing this task? >> >> Thanks >> NB >> >> >> On Fri, Jan 29, 2016 at 1:00 PM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> Sp

Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
anyways. > > Thanks > NB > > > On Fri, Jan 29, 2016 at 5:09 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> It looks weird. Why don't you just pass "new SomeClass()" to "somefunc"? >> You don't need to use ThreadLocal if ther

Re: mapWithState: remove key

2016-01-29 Thread Shixiong(Ryan) Zhu
1. To remove a state, you need to call "state.remove()". If you return a None in the function, it just means don't output it as the DStream's output, but the state won't be removed if you don't call "state.remove()". 2. For NoSuchElementException, here is the doc for "State.get": /** * Get

Re: Getting Exceptions/WARN during random runs for same dataset

2016-01-29 Thread Shixiong(Ryan) Zhu
It's a known issue. See https://issues.apache.org/jira/browse/SPARK-10719 On Thu, Jan 28, 2016 at 5:44 PM, Khusro Siddiqui wrote: > It is happening on random executors on random nodes. Not on any specific > node everytime. > Or not happening at all > > On Thu, Jan 28, 2016 at

Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
Of cause. If you use a ThreadLocal in a long living thread and forget to remove it, it's definitely a memory leak. On Thu, Jan 28, 2016 at 9:31 PM, N B wrote: > Hello, > > Does anyone know if there are any potential pitfalls associated with using > ThreadLocal variables in

Re: Spark streaming and ThreadLocal

2016-01-29 Thread Shixiong(Ryan) Zhu
gt; On Fri, Jan 29, 2016 at 12:12 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Of cause. If you use a ThreadLocal in a long living thread and forget to >> remove it, it's definitely a memory leak. >> >> On Thu, Jan 28, 2016 at 9:31 PM,

Re: streaming in 1.6.0 slower than 1.5.1

2016-01-28 Thread Shixiong(Ryan) Zhu
Hey Jesse, Could you provide the operators you using? For the heap dump, it may be not a real memory leak. Since batches started to queue up, the memory usage should increase. On Thu, Jan 28, 2016 at 11:54 AM, Ted Yu wrote: > bq. The total size by class B is 3GB in 1.5.1

Re: Data not getting printed in Spark Streaming with print().

2016-01-28 Thread Shixiong(Ryan) Zhu
fileStream has a parameter "newFilesOnly". By default, it's true and means processing only new files and ignore existing files in the directory. So you need to ***move*** the files into the directory, otherwise it will ignore existing files. You can also set "newFilesOnly" to false. Then in the

Re: spark.kryo.classesToRegister

2016-01-27 Thread Shixiong(Ryan) Zhu
It depends. The default Kryo serializer cannot handle all cases. If you encounter any issue, you can follow the Kryo doc to set up custom serializer: https://github.com/EsotericSoftware/kryo/blob/master/README.md On Wed, Jan 27, 2016 at 3:13 AM, amit tewari wrote: > This

Re: FAIR scheduler in Spark Streaming

2016-01-26 Thread Shixiong(Ryan) Zhu
The number of concurrent Streaming job is controlled by "spark.streaming.concurrentJobs". It's 1 by default. However, you need to keep in mind that setting it to a bigger number will allow jobs of several batches running at the same time. It's hard to predicate the behavior and sometimes will

Re: Need a sample code to load XML files into cassandra database using spark streaming

2016-01-26 Thread Shixiong(Ryan) Zhu
You can use spark-xml to read the xml files. https://github.com/databricks/spark-xml has some examples. To save your results to cassandra, you can use spark-cassandra-connector: https://github.com/datastax/spark-cassandra-connector On Tue, Jan 26, 2016 at 10:10 AM, Sree Eedupuganti

Re: streaming textFileStream problem - got only ONE line

2016-01-25 Thread Shixiong(Ryan) Zhu
Did you move the file into "hdfs://helmhdfs/user/patcharee/cerdata/", or write into it directly? `textFileStream` requires that files must be written to the monitored directory by "moving" them from another location within the same file system. On Mon, Jan 25, 2016 at 6:30 AM, patcharee

Re: mapWithState and context start when checkpoint exists

2016-01-25 Thread Shixiong(Ryan) Zhu
Hey Andrey, `ConstantInputDStream` doesn't support checkpoint as it contains an RDD field. It cannot resume from checkpoints. On Mon, Jan 25, 2016 at 1:12 PM, Andrey Yegorov wrote: > Hi, > > I am new to spark (and scala) and hope someone can help me with the issue > I

Re: Spark Streaming Write Ahead Log (WAL) not replaying data after restart

2016-01-25 Thread Shixiong(Ryan) Zhu
You need to define a create function and use StreamingContext.getOrCreate. See the example here: http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing On Thu, Jan 21, 2016 at 3:32 AM, Patrick McGloin wrote: > Hi all, > >

Re: updateStateByKey not persisting in Spark 1.5.1

2016-01-20 Thread Shixiong(Ryan) Zhu
Could you share your log? On Wed, Jan 20, 2016 at 7:55 AM, Brian London wrote: > I'm running a streaming job that has two calls to updateStateByKey. When > run in standalone mode both calls to updateStateByKey behave as expected. > When run on a cluster, however, it

Re: Container exited with a non-zero exit code 1-SparkJOb on YARN

2016-01-20 Thread Shixiong(Ryan) Zhu
Could you share your log? On Wed, Jan 20, 2016 at 5:37 AM, Siddharth Ubale < siddharth.ub...@syncoms.com> wrote: > > > Hi, > > > > I am running a Spark Job on the yarn cluster. > > The spark job is a spark streaming application which is reading JSON from > a kafka topic , inserting the JSON

Re: using spark context in map funciton TASk not serilizable error

2016-01-20 Thread Shixiong(Ryan) Zhu
You should not use SparkContext or RDD directly in your closures. Could you show the codes of "method1"? Maybe you only needs join or something else. E.g., val cassandraRDD = sc.cassandraTable("keySpace", "tableName") reRDD.join(cassandraRDD).map().saveAsTextFile(outputDir) On Tue, Jan 19,

Re: Calling SparkContext methods in scala Future

2016-01-18 Thread Shixiong(Ryan) Zhu
Hey Marco, Since the codes in Future is in an asynchronous way, you cannot call "sparkContext.stop" at the end of "fetch" because the codes in Future may not finish. However, the exception seems weird. Do you have a simple reproducer? On Mon, Jan 18, 2016 at 9:13 AM, Ted Yu

Re: Spark Streaming: Does mapWithState implicitly partition the dsteram?

2016-01-18 Thread Shixiong(Ryan) Zhu
mapWithState uses HashPartitioner by default. You can use "StateSpec.partitioner" to set your custom partitioner. On Sun, Jan 17, 2016 at 11:00 AM, Lin Zhao wrote: > When the state is passed to the task that handles a mapWithState for a > particular key, if the key is

Re: Incorrect timeline for Scheduling Delay in Streaming page in web UI?

2016-01-18 Thread Shixiong(Ryan) Zhu
Hey, did you mean that the scheduling delay timeline is incorrect because it's too short and some values are missing? A batch won't have a scheduling delay until it starts to run. In your example, a lot of batches are waiting so that they don't have the scheduling delay. On Sun, Jan 17, 2016 at

Re: 答复: 答复: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-18 Thread Shixiong(Ryan) Zhu
oint(RDD.scala:300) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) > > at org.a

Re: [Spark 1.6][Streaming] About the behavior of mapWithState

2016-01-15 Thread Shixiong(Ryan) Zhu
Hey Terry, That's expected. If you want to only output (1, 3), you can use "reduceByKey" before "mapWithState" like this: dstream.reduceByKey(_ + _).mapWithState(spec) On Fri, Jan 15, 2016 at 1:21 AM, Terry Hoo wrote: > Hi, > I am doing a simple test with mapWithState,

Re: 答复: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-15 Thread Shixiong(Ryan) Zhu
gt; allKafkaWindowData = > this.sparkReceiverDStream.createReceiverDStream(jsc,this.streamingConf.getWindowDuration(), > > this.streamingConf.getSlideDuration()); > > > > this.businessProcess(allKafkaWindowData); > > this.sleep(); > >jsc.start(); &

Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this still happens? It may be because you don't have enough memory to cache the events. On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao wrote: > Hi, > > I'm testing spark streaming with actor receiver. The actor

Re: How to bind webui to localhost?

2016-01-14 Thread Shixiong(Ryan) Zhu
Yeah, it's hard code as "0.0.0.0". Could you send a PR to add a configuration for it? On Thu, Jan 14, 2016 at 2:51 PM, Zee Chen wrote: > Hi, what is the easiest way to configure the Spark webui to bind to > localhost or 127.0.0.1? I intend to use this with ssh socks proxy to >

Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
:31:31 INFO storage.BlockManager: Removing RDD 44 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42 > 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41 > 16/01/15 00:31:31 INFO storage.BlockMana

Re: NPE when using Joda DateTime

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you try to use "Kryo.setDefaultSerializer" like this: class YourKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.setDefaultSerializer(classOf[com.esotericsoftware.kryo.serializers.JavaSerializer]) } } On Thu, Jan 14, 2016 at 12:54 PM, Durgesh

Re: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you show your codes? Did you use `StreamingContext.awaitTermination`? If so, it will return if any exception happens. On Wed, Jan 13, 2016 at 11:47 PM, Triones,Deng(vip.com) < triones.d...@vipshop.com> wrote: > What’s more, I am running a 7*24 hours job , so I won’t call System.exit() > by

Re: Too many tasks killed the scheduler

2016-01-11 Thread Shixiong(Ryan) Zhu
Could you use "coalesce" to reduce the number of partitions? Shixiong Zhu On Mon, Jan 11, 2016 at 12:21 AM, Gavin Yue wrote: > Here is more info. > > The job stuck at: > INFO cluster.YarnScheduler: Adding task set 1.0 with 79212 tasks > > Then got the error: > Caused

Re: Kryo serializer Exception during serialization: java.io.IOException: java.lang.IllegalArgumentException:

2016-01-08 Thread Shixiong(Ryan) Zhu
Could you disable `spark.kryo.registrationRequired`? Some classes may not be registered but they work well with Kryo's default serializer. On Fri, Jan 8, 2016 at 8:58 AM, Ted Yu wrote: > bq. try adding scala.collection.mutable.WrappedArray > > But the hint said registering

Re: Negative Number of Active Tasks in Spark UI

2016-01-05 Thread Shixiong(Ryan) Zhu
Did you enable "spark.speculation"? On Tue, Jan 5, 2016 at 9:14 AM, Prasad Ravilla wrote: > I am using Spark 1.5.2. > > I am not using Dynamic allocation. > > Thanks, > Prasad. > > > > > On 1/5/16, 3:24 AM, "Ted Yu" wrote: > > >Which version of Spark do

Re: Double Counting When Using Accumulators with Spark Streaming

2016-01-05 Thread Shixiong(Ryan) Zhu
Hey Rachana, There are two jobs in your codes actually: `rdd.isEmpty` and `rdd.saveAsTextFile`. Since you don't cache or checkpoint this rdd, it will execute your map function twice for each record. You can move "accum.add(1)" to "rdd.saveAsTextFile" like this: JavaDStream lines =

Re: How to register a Tuple3 with KryoSerializer?

2015-12-30 Thread Shixiong(Ryan) Zhu
You can use "_", e.g., sparkConf.registerKryoClasses(Array(classOf[scala.Tuple3[_, _, _]])) Best Regards, Shixiong(Ryan) Zhu Software Engineer Databricks Inc. shixi...@databricks.com databricks.com <http://databricks.com/> On Wed, Dec 30, 2015 at 10:16 AM, Russ <russ.br..

Re: 2 of 20,675 Spark Streaming : Out put frequency different from read frequency in StatefulNetworkWordCount

2015-12-30 Thread Shixiong(Ryan) Zhu
You can use "reduceByKeyAndWindow", e.g., val lines = ssc.socketTextStream("localhost", ) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y, Seconds(60), Seconds(60)) wordCounts.print() On Wed, Dec

<    1   2