Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Lin Zhao
Hi Shixiong,

Just figured it out. I was doing a .print() as output operation, which seems to 
stop the batch once it has 10 through. I changed it to a no-op foreachRDD and 
it works.

Thanks for jumping in to help me.

From: "Shixiong(Ryan) Zhu" 
mailto:shixi...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:41 PM
To: Lin Zhao mailto:l...@exabeam.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

Could you post the codes of MessageRetriever? And by the way, could you post 
the screenshot of tasks for a batch and check the input size of these tasks? 
Considering there are so many events, there should be a lot of blocks as well 
as a lot of tasks.

On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao 
mailto:l...@exabeam.com>> wrote:
Hi Shixiong,

I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. 
Batch duration is 20 seconds.

Some logs seemingly related to block manager:


16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored 
as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] 
message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 
lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" 
module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored 
as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39

From: "Shixiong(Ryan) Zhu" 
mailto:shixi...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao mailto:l...@exabeam.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

MEMORY_AND_DISK_SER_2



Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you post the codes of MessageRetriever? And by the way, could you
post the screenshot of tasks for a batch and check the input size of these
tasks? Considering there are so many events, there should be a lot of
blocks as well as a lot of tasks.

On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao  wrote:

> Hi Shixiong,
>
> I tried this but it still happens. If it helps, it's 1.6.0 and runs on
> YARN. Batch duration is 20 seconds.
>
> Some logs seemingly related to block manager:
>
> 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block 
> input-0-1452817873000
> 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 
> stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
> 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block 
> input-0-1452817879000
> 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read 
> [win] message file(s) for 2015-12-17T21:00:00.000." 
> module=TIMESPAN_HDFS_READER
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
> 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 
> lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" 
> module=MESSAGE_RETRIEVER
> 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 
> stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39
>
>
> From: "Shixiong(Ryan) Zhu" 
> Date: Thursday, January 14, 2016 at 4:13 PM
> To: Lin Zhao 
> Cc: user 
> Subject: Re: Spark Streaming: custom actor receiver losing vast majority
> of data
>
> MEMORY_AND_DISK_SER_2
>


Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Lin Zhao
Hi Shixiong,

I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. 
Batch duration is 20 seconds.

Some logs seemingly related to block manager:


16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored 
as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] 
message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 
lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" 
module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored 
as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39

From: "Shixiong(Ryan) Zhu" 
mailto:shixi...@databricks.com>>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao mailto:l...@exabeam.com>>
Cc: user mailto:user@spark.apache.org>>
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

MEMORY_AND_DISK_SER_2


Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this
still happens? It may be because you don't have enough memory to cache the
events.

On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao  wrote:

> Hi,
>
> I'm testing spark streaming with actor receiver. The actor keeps calling
> store() to save a pair to Spark.
>
> Once the job is launched, on the UI everything looks good. Millions of
> events gets through every batch. However, I added logging to the first step
> and found that only 20 or 40 events in a batch actually gets to the first
> mapper. Any idea what might be causing this?
>
> I also have log in the custom receiver before "store()" call and it's
> really calling this function millions of times.
>
> The receiver definition looks like:
>
> val stream = ssc.actorStream[(String, 
> Message)](MessageRetriever.props("message-retriever",
>   mrSections.head, conf, flowControlDef, None, None), "Martini",
>   StorageLevel.MEMORY_ONLY_SER)
>
>
> The job looks like:
>
> stream.map { pair =>
> logger.info(s"before pipeline key=${pair._1}") // Only a handful gets 
> logged although there are over 1 million in a batch
> pair._2
> }.flatMap { m =>
>   // Event Builder
>   logger.info(s"event builder thread-id=${Thread.currentThread().getId} 
> user=${m.fields.getOrElse('user, "NA")}")
>   ebHelper(m)
> }.map { e =>
>   // Event Normalizer
>   logger.info(s"normalizer thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   DefaultEventNormalizer.normalizeFields(e)
> }.map { e =>
>   logger.info(s"resolver thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   resolver(e)
> }.flatMap { e =>
>   // Event Discarder
>   logger.info(s"discarder thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   discarder(e)
> }.map { e =>
>   ep(e)
> }
>
>