Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this still happens? It may be because you don't have enough memory to cache the events.
On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao <[email protected]> wrote: > Hi, > > I'm testing spark streaming with actor receiver. The actor keeps calling > store() to save a pair to Spark. > > Once the job is launched, on the UI everything looks good. Millions of > events gets through every batch. However, I added logging to the first step > and found that only 20 or 40 events in a batch actually gets to the first > mapper. Any idea what might be causing this? > > I also have log in the custom receiver before "store()" call and it's > really calling this function millions of times. > > The receiver definition looks like: > > val stream = ssc.actorStream[(String, > Message)](MessageRetriever.props("message-retriever", > mrSections.head, conf, flowControlDef, None, None), "Martini", > StorageLevel.MEMORY_ONLY_SER) > > > The job looks like: > > stream.map { pair => > logger.info(s"before pipeline key=${pair._1}") // Only a handful gets > logged although there are over 1 million in a batch > pair._2 > }.flatMap { m => > // Event Builder > logger.info(s"event builder thread-id=${Thread.currentThread().getId} > user=${m.fields.getOrElse('user, "NA")}") > ebHelper(m) > }.map { e => > // Event Normalizer > logger.info(s"normalizer thread-id=${Thread.currentThread().getId} > user=${e.getFieldAsString('user)}") > DefaultEventNormalizer.normalizeFields(e) > }.map { e => > logger.info(s"resolver thread-id=${Thread.currentThread().getId} > user=${e.getFieldAsString('user)}") > resolver(e) > }.flatMap { e => > // Event Discarder > logger.info(s"discarder thread-id=${Thread.currentThread().getId} > user=${e.getFieldAsString('user)}") > discarder(e) > }.map { e => > ep(e) > } > >
