Not sure why this would cause the application to crash, but I can give some background about how the Spark runner reads microbatches from Kafka (and generally UnboundedSources):
With each microbatch having a pre-set interval time, the Spark runner would apply bounds on the read from the source (Kafka). By default, a time bound is set (10% of the interval, but no less then 200 msec - those knobs are available in PipelineOptions), and a bound on the number of records can also be set (explicitly or by enabling Spark's backpressure mechanism). Those bounds are meant to avoid reading-forever-but-never-processing as Spark is a microbatch engine and so the readers have to stop at some point and "release" the read RDD as part of the DStream. This approach is somewhat similar to the integration with Spark's direct Kafka by generating the stream's initial RDDs by itself (unlike Receivers) but it's different because of the bounds, as it tries to avoid (too long) reads that will cause delays. If you're interested more in the internals of the design of Spark's read from UnboundedSources you can find it here <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing> . Back to your issue, since your pipeline only reads/writes and doesn't apply any time-consuming computations or shuffles, you could increase the read time percentage ("--readTimePercentage": (0, 1) defaults to 0.1). Let me know if this helps. Amit On Sat, Jan 21, 2017 at 12:07 AM Xu Mingmin <[email protected]> wrote: > Thanks @Amit. > > I tried the parameters mentioned, seems the core setting is > '--batchIntervalMillis'. I can run the simple KafkaInKafkaOut job with > value >= 3000 only, no matter value for --maxRecordsPerBatch, or > --sparkMaster=local[2] / --sparkMaster=local[*]. > > Here's the kafka consumer configuration, not sure why >=3000ms works. > > 17/01/20 13:27:54 INFO ConsumerConfig: ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > partition.assignment.strategy = > [org.apache.kafka.clients.consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [***:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = consumer-6 > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 <(214)%20748-3647> > check.crcs = true > request.timeout.ms = 40000 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 524288 > sasl.login.class = null > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > group.id = Reader-0_offset_consumer_676510629_none > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > session.timeout.ms = 30000 > metrics.num.samples = 2 > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > sasl.callback.handler.class = null > auto.offset.reset = latest > > > > > On Fri, Jan 20, 2017 at 12:19 PM, Amit Sela <[email protected]> wrote: > > Beam streaming pipelines over the spark runner are still experimental so > there's nothing official, but at PayPal we're running a version which is > very similar to the Apache branch, especially if your only executing > Kafka-in-Kafka-out pipeline. > > From my experience with Spark, this usually means that something caused > Spark to stop, and in case there was no clear exception failing the job it > might indicate insufficient resources. > > Have you tried setting the option: "--sparkMaster local[*]" which will use > the number of threads (spark believes) your machine can provide instead of > the default 4 set in SparkPipelineOptions. > If resources are indeed the problem you could also try increasing the > batch interval (--batchIntervalMillis) (default: 1000) and even bounding > the records read in each microbatch ("--maxRecordsPerBatch") (default: -1). > > On Fri, Jan 20, 2017 at 10:08 PM Xu Mingmin <[email protected]> wrote: > > it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with > the same error: > > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, > localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true, > 1),11616,0,0)) > 17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on > localhost:59305 (size: 1340.0 B, free: 1124.5 MB) > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, > localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false, > 1),1340,0,0)) > 17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19). > 2956 bytes result sent to driver > 17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at > SparkUnboundedSource.java:154) failed in 0.564 s > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@43f0ada9) > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException: > Job 3 cancelled because SparkContext was shut down)) > > > Btw, is there a runnable example for Spark streaming so I can refer to? > > Thanks! > Mingmin > > On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <[email protected]> wrote: > > The WakeupException is being logged and not thrown (it is OK since the > reader was closed due to end-of-microbatch), so I wonder what causes "ERROR > StreamingListenerBus: StreamingListenerBus has already stopped". > > Are you running in local-mode ("local[*]") ? or over YARN ? > Any specific options you're using ? > Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT > > Amit. > > On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <[email protected]> wrote: > > Hello all, > > I'm working on a streaming POC project, which is written with Beam API, > and run on both FlinkRunner and SparkRunner. It works good on Flink, > however I cannot run it on SparkRunner. > > Currently I run it locally, and get this exception: > > 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest > offsets. ignored. > org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:404) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) > at > org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:374) > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341) > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242) > at > com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.updateLatestOffsets(KafkaIO.java:1059) > at > com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.access$3(KafkaIO.java:1055) > at > com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(KafkaIO.java:966) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has > already stopped! Dropping event > StreamingListenerOutputOperationCompleted(OutputOperationInfo(1484939618000 > ms,0,foreachRDD at > UnboundedDataset.java:102,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42) > > org.apache.beam.runners.spark.translation.streaming.UnboundedDataset.action(UnboundedDataset.java:102) > > org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:164) > > org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFactory.java:78) > > org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:706) > > org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:705) > scala.Option.getOrElse(Option.scala:120) > > org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864) > > org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:705) > > org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81) > org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > > com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(1484939630152),None)) > > > Here's my code: > > SparkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create(); > > options.setRunner(SparkRunner.class); > options.setStreaming(true); > > Pipeline pipeline = Pipeline.create(options); > > PCollection<String> inStream = pipeline.apply("source", > > KafkaIO.read().withBootstrapServers(KAFKA_BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC)) > .withKeyCoder(ByteArrayCoder.of()) > .withValueCoder(StringUtf8Coder.of()) > // .withMaxNumRecords(5) > .withoutMetadata() > )... > > > Environment: > > Apache-Beam : 0.4.0 > Kafka: 0.9/0.10 (test both) > Spark: 1.6.3 > > > I can run it by adding withMaxNumRecords(), however it's batch-onetime > then. > > Any suggestion? > > Thanks! > Mingmin > > > >
