Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this
still happens? It may be because you don't have enough memory to cache the
events.

On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao <[email protected]> wrote:

> Hi,
>
> I'm testing spark streaming with actor receiver. The actor keeps calling
> store() to save a pair to Spark.
>
> Once the job is launched, on the UI everything looks good. Millions of
> events gets through every batch. However, I added logging to the first step
> and found that only 20 or 40 events in a batch actually gets to the first
> mapper. Any idea what might be causing this?
>
> I also have log in the custom receiver before "store()" call and it's
> really calling this function millions of times.
>
> The receiver definition looks like:
>
> val stream = ssc.actorStream[(String, 
> Message)](MessageRetriever.props("message-retriever",
>   mrSections.head, conf, flowControlDef, None, None), "Martini",
>   StorageLevel.MEMORY_ONLY_SER)
>
>
> The job looks like:
>
> stream.map { pair =>
>     logger.info(s"before pipeline key=${pair._1}") // Only a handful gets 
> logged although there are over 1 million in a batch
>     pair._2
> }.flatMap { m =>
>   // Event Builder
>   logger.info(s"event builder thread-id=${Thread.currentThread().getId} 
> user=${m.fields.getOrElse('user, "NA")}")
>   ebHelper(m)
> }.map { e =>
>   // Event Normalizer
>   logger.info(s"normalizer thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   DefaultEventNormalizer.normalizeFields(e)
> }.map { e =>
>   logger.info(s"resolver thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   resolver(e)
> }.flatMap { e =>
>   // Event Discarder
>   logger.info(s"discarder thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   discarder(e)
> }.map { e =>
>   ep(e)
> }
>
>

Reply via email to