You're right that in current implementation the Kafka consumer is being initialized for each micro-batch, which is not different from direct Kafka in native Spark (it is so because RDDs and their execution context live as long as the batch is processed, an implementation baed on receivers would be long lived but has resilience issues).
I wonder why you would require 25s with Kafka 0.10 + external auth - is the authentication taking long ? Setting auth. aside, I did notice a funny behaviour where although a new consumer is created each microbatch, the first one takes far longer then the following, which is actually good. One way to improve init. in case of a large overhead could be to replace the creation of Readers with a lazy init. connection pool via broadcast variables - filed BEAM-1294 <https://issues.apache.org/jira/browse/BEAM-1294>. Amit. On Sat, Jan 21, 2017 at 10:41 PM Xu Mingmin <[email protected]> wrote: Thanks @Amit for the detailed design doc, that helps a lot to understand how it works in the backend. I think my error is probably caused due to it takes too long to read from Kafka. By increasing *readTimePercentage*, it could run with smaller *batchIntervalMillis*. With some debug work on KafkaIO, looks like *KafkaIO.UnboundedKafkaReader.start()* is called by every micro-batch. That makes sense why I need such a large interval time(larger than 25s for other test with kafka0.10+external_auth). I suppose start() should be invoked only once before the first microbatch, and keep alive. Please correct me if any misunderstanding. Mingmin On Sat, Jan 21, 2017 at 1:35 AM, Amit Sela <[email protected]> wrote: Not sure why this would cause the application to crash, but I can give some background about how the Spark runner reads microbatches from Kafka (and generally UnboundedSources): With each microbatch having a pre-set interval time, the Spark runner would apply bounds on the read from the source (Kafka). By default, a time bound is set (10% of the interval, but no less then 200 msec - those knobs are available in PipelineOptions), and a bound on the number of records can also be set (explicitly or by enabling Spark's backpressure mechanism). Those bounds are meant to avoid reading-forever-but-never-processing as Spark is a microbatch engine and so the readers have to stop at some point and "release" the read RDD as part of the DStream. This approach is somewhat similar to the integration with Spark's direct Kafka by generating the stream's initial RDDs by itself (unlike Receivers) but it's different because of the bounds, as it tries to avoid (too long) reads that will cause delays. If you're interested more in the internals of the design of Spark's read from UnboundedSources you can find it here <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing> . Back to your issue, since your pipeline only reads/writes and doesn't apply any time-consuming computations or shuffles, you could increase the read time percentage ("--readTimePercentage": (0, 1) defaults to 0.1). Let me know if this helps. Amit On Sat, Jan 21, 2017 at 12:07 AM Xu Mingmin <[email protected]> wrote: Thanks @Amit. I tried the parameters mentioned, seems the core setting is '--batchIntervalMillis'. I can run the simple KafkaInKafkaOut job with value >= 3000 only, no matter value for --maxRecordsPerBatch, or --sparkMaster=local[2] / --sparkMaster=local[*]. Here's the kafka consumer configuration, not sure why >=3000ms works. 17/01/20 13:27:54 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [***:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-6 ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 <(214)%20748-3647> check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 524288 sasl.login.class = null ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = Reader-0_offset_consumer_676510629_none retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 sasl.callback.handler.class = null auto.offset.reset = latest On Fri, Jan 20, 2017 at 12:19 PM, Amit Sela <[email protected]> wrote: Beam streaming pipelines over the spark runner are still experimental so there's nothing official, but at PayPal we're running a version which is very similar to the Apache branch, especially if your only executing Kafka-in-Kafka-out pipeline. >From my experience with Spark, this usually means that something caused Spark to stop, and in case there was no clear exception failing the job it might indicate insufficient resources. Have you tried setting the option: "--sparkMaster local[*]" which will use the number of threads (spark believes) your machine can provide instead of the default 4 set in SparkPipelineOptions. If resources are indeed the problem you could also try increasing the batch interval (--batchIntervalMillis) (default: 1000) and even bounding the records read in each microbatch ("--maxRecordsPerBatch") (default: -1). On Fri, Jan 20, 2017 at 10:08 PM Xu Mingmin <[email protected]> wrote: it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with the same error: 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true, 1),11616,0,0)) 17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on localhost:59305 (size: 1340.0 B, free: 1124.5 MB) 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false, 1),1340,0,0)) 17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19). 2956 bytes result sent to driver 17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at SparkUnboundedSource.java:154) failed in 0.564 s 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@43f0ada9) 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException: Job 3 cancelled because SparkContext was shut down)) Btw, is there a runnable example for Spark streaming so I can refer to? Thanks! Mingmin On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <[email protected]> wrote: The WakeupException is being logged and not thrown (it is OK since the reader was closed due to end-of-microbatch), so I wonder what causes "ERROR StreamingListenerBus: StreamingListenerBus has already stopped". Are you running in local-mode ("local[*]") ? or over YARN ? Any specific options you're using ? Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT Amit. On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <[email protected]> wrote: Hello all, I'm working on a streaming POC project, which is written with Beam API, and run on both FlinkRunner and SparkRunner. It works good on Flink, however I cannot run it on SparkRunner. Currently I run it locally, and get this exception: 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest offsets. ignored. org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:404) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) at org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:374) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341) at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197) at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242) at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.updateLatestOffsets(KafkaIO.java:1059) at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.access$3(KafkaIO.java:1055) at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(KafkaIO.java:966) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has already stopped! Dropping event StreamingListenerOutputOperationCompleted(OutputOperationInfo(1484939618000 ms,0,foreachRDD at UnboundedDataset.java:102,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42) org.apache.beam.runners.spark.translation.streaming.UnboundedDataset.action(UnboundedDataset.java:102) org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:164) org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFactory.java:78) org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:706) org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:705) scala.Option.getOrElse(Option.scala:120) org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864) org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:705) org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159) org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81) org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(1484939630152),None)) Here's my code: SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create(); options.setRunner(SparkRunner.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); PCollection<String> inStream = pipeline.apply("source", KafkaIO.read().withBootstrapServers(KAFKA_BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC)) .withKeyCoder(ByteArrayCoder.of()) .withValueCoder(StringUtf8Coder.of()) // .withMaxNumRecords(5) .withoutMetadata() )... Environment: Apache-Beam : 0.4.0 Kafka: 0.9/0.10 (test both) Spark: 1.6.3 I can run it by adding withMaxNumRecords(), however it's batch-onetime then. Any suggestion? Thanks! Mingmin
