Beam streaming pipelines over the spark runner are still experimental so
there's nothing official, but at PayPal we're running a version which is
very similar to the Apache branch, especially if your only executing
Kafka-in-Kafka-out pipeline.

>From my experience with Spark, this usually means that something caused
Spark to stop, and in case there was no clear exception failing the job it
might indicate insufficient resources.

Have you tried setting the option: "--sparkMaster local[*]" which will use
the number of threads (spark believes) your machine can provide instead of
the default 4 set in SparkPipelineOptions.
If resources are indeed the problem you could also try increasing the batch
interval (--batchIntervalMillis) (default: 1000) and even bounding the
records read in each microbatch ("--maxRecordsPerBatch") (default: -1).

On Fri, Jan 20, 2017 at 10:08 PM Xu Mingmin <[email protected]> wrote:

> it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with
> the same error:
>
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
> localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true,
> 1),11616,0,0))
> 17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on
> localhost:59305 (size: 1340.0 B, free: 1124.5 MB)
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
> localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false,
> 1),1340,0,0))
> 17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19).
> 2956 bytes result sent to driver
> 17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at
> SparkUnboundedSource.java:154) failed in 0.564 s
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@43f0ada9)
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException:
> Job 3 cancelled because SparkContext was shut down))
>
>
> Btw, is there a runnable example for Spark streaming so I can refer to?
>
> Thanks!
> Mingmin
>
> On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <[email protected]> wrote:
>
> The WakeupException is being logged and not thrown (it is OK since the
> reader was closed due to end-of-microbatch), so I wonder what causes "ERROR
> StreamingListenerBus: StreamingListenerBus has already stopped".
>
> Are you running in local-mode ("local[*]") ? or over YARN ?
> Any specific options you're using ?
> Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT
>
> Amit.
>
> On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <[email protected]> wrote:
>
> Hello all,
>
> I'm working on a streaming POC project, which is written with Beam API,
> and run on both FlinkRunner and SparkRunner. It works good on Flink,
> however I cannot run it on SparkRunner.
>
> Currently I run it locally, and get this exception:
>
> 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest
> offsets. ignored.
> org.apache.kafka.common.errors.WakeupException
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:404)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
>     at
> org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:374)
>     at
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341)
>     at
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
>     at
> com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.updateLatestOffsets(KafkaIO.java:1059)
>     at
> com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.access$3(KafkaIO.java:1055)
>     at
> com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(KafkaIO.java:966)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has
> already stopped! Dropping event
> StreamingListenerOutputOperationCompleted(OutputOperationInfo(1484939618000
> ms,0,foreachRDD at
> UnboundedDataset.java:102,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
>
> org.apache.beam.runners.spark.translation.streaming.UnboundedDataset.action(UnboundedDataset.java:102)
>
> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:164)
>
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFactory.java:78)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:706)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:705)
> scala.Option.getOrElse(Option.scala:120)
>
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:705)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81)
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
>
> com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(1484939630152),None))
>
>
> Here's my code:
>
> SparkPipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation()
>         .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create();
>
> options.setRunner(SparkRunner.class);
> options.setStreaming(true);
>
> Pipeline pipeline = Pipeline.create(options);
>
> PCollection<String> inStream = pipeline.apply("source",
>
> KafkaIO.read().withBootstrapServers(KAFKA_BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC))
>                 .withKeyCoder(ByteArrayCoder.of())
>                 .withValueCoder(StringUtf8Coder.of())
> //                .withMaxNumRecords(5)
>                 .withoutMetadata()
>                 )...
>
>
> Environment:
>
> Apache-Beam : 0.4.0
> Kafka: 0.9/0.10 (test both)
> Spark: 1.6.3
>
>
> I can run it by adding withMaxNumRecords(), however it's batch-onetime
> then.
>
> Any suggestion?
>
> Thanks!
> Mingmin
>
>
>

Reply via email to