You're right that in current implementation the Kafka consumer is being
initialized for each micro-batch, which is not different from direct Kafka
in native Spark (it is so because RDDs and their execution context live as
long as the batch is processed, an implementation baed on receivers would
be long lived but has resilience issues).

I wonder why you would require 25s with Kafka 0.10 + external auth - is the
authentication taking long ?
Setting auth. aside, I did notice a funny behaviour where although a new
consumer is created each microbatch, the first one takes far longer then
the following, which is actually good.

One way to improve init. in case of a large overhead could be to replace
the creation of Readers with a lazy init. connection pool via broadcast
variables - filed BEAM-1294
<https://issues.apache.org/jira/browse/BEAM-1294>.

Amit.

On Sat, Jan 21, 2017 at 10:41 PM Xu Mingmin <[email protected]> wrote:

Thanks @Amit for the detailed design doc, that helps a lot to understand
how it works in the backend.

I think my error is probably caused due to it takes too long to read from
Kafka. By increasing *readTimePercentage*, it could run with smaller
*batchIntervalMillis*.

With some debug work on KafkaIO, looks like
*KafkaIO.UnboundedKafkaReader.start()* is called by every micro-batch. That
makes sense why I need such a large interval time(larger than 25s for other
test with kafka0.10+external_auth). I suppose start() should be invoked
only once before the first microbatch, and keep alive.  Please correct me
if any misunderstanding.

Mingmin

On Sat, Jan 21, 2017 at 1:35 AM, Amit Sela <[email protected]> wrote:

Not sure why this would cause the application to crash, but I can give some
background about how the Spark runner reads microbatches from Kafka (and
generally UnboundedSources):

With each microbatch having a pre-set interval time, the Spark runner would
apply bounds on the read from the source (Kafka).
By default, a time bound is set (10% of the interval, but no less then 200
msec - those knobs are available in PipelineOptions), and a bound on the
number of records can also be set (explicitly or by enabling Spark's
backpressure mechanism).
Those bounds are meant to avoid reading-forever-but-never-processing as
Spark is a microbatch engine and so the readers have to stop at some point
and "release" the read RDD as part of the DStream.
This approach is somewhat similar to the integration with Spark's direct
Kafka by generating the stream's initial RDDs by itself (unlike Receivers)
but it's different because of the bounds, as it tries to avoid (too long)
reads that will cause delays.
If you're interested more in the internals of the design of Spark's read
from UnboundedSources you can find it here
<https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
.

Back to your issue, since your pipeline only reads/writes and doesn't apply
any time-consuming computations or shuffles, you could increase the read
time percentage ("--readTimePercentage": (0, 1) defaults to 0.1).

Let me know if this helps.

Amit

On Sat, Jan 21, 2017 at 12:07 AM Xu Mingmin <[email protected]> wrote:

Thanks @Amit.

I tried the parameters mentioned, seems the core setting is
'--batchIntervalMillis'. I can run the simple KafkaInKafkaOut job with
value >= 3000 only, no matter value for --maxRecordsPerBatch, or
--sparkMaster=local[2] / --sparkMaster=local[*].

Here's the kafka consumer configuration, not sure why >=3000ms works.

17/01/20 13:27:54 INFO ConsumerConfig: ConsumerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [***:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = consumer-6
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647 <(214)%20748-3647>
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 524288
    sasl.login.class = null
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id = Reader-0_offset_consumer_676510629_none
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    sasl.callback.handler.class = null
    auto.offset.reset = latest




On Fri, Jan 20, 2017 at 12:19 PM, Amit Sela <[email protected]> wrote:

Beam streaming pipelines over the spark runner are still experimental so
there's nothing official, but at PayPal we're running a version which is
very similar to the Apache branch, especially if your only executing
Kafka-in-Kafka-out pipeline.

>From my experience with Spark, this usually means that something caused
Spark to stop, and in case there was no clear exception failing the job it
might indicate insufficient resources.

Have you tried setting the option: "--sparkMaster local[*]" which will use
the number of threads (spark believes) your machine can provide instead of
the default 4 set in SparkPipelineOptions.
If resources are indeed the problem you could also try increasing the batch
interval (--batchIntervalMillis) (default: 1000) and even bounding the
records read in each microbatch ("--maxRecordsPerBatch") (default: -1).

On Fri, Jan 20, 2017 at 10:08 PM Xu Mingmin <[email protected]> wrote:

it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with
the same error:

17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true,
1),11616,0,0))
17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on
localhost:59305 (size: 1340.0 B, free: 1124.5 MB)
17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false,
1),1340,0,0))
17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19).
2956 bytes result sent to driver
17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at
SparkUnboundedSource.java:154) failed in 0.564 s
17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@43f0ada9)
17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event
SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException:
Job 3 cancelled because SparkContext was shut down))


Btw, is there a runnable example for Spark streaming so I can refer to?

Thanks!
Mingmin

On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <[email protected]> wrote:

The WakeupException is being logged and not thrown (it is OK since the
reader was closed due to end-of-microbatch), so I wonder what causes "ERROR
StreamingListenerBus: StreamingListenerBus has already stopped".

Are you running in local-mode ("local[*]") ? or over YARN ?
Any specific options you're using ?
Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT

Amit.

On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <[email protected]> wrote:

Hello all,

I'm working on a streaming POC project, which is written with Beam API, and
run on both FlinkRunner and SparkRunner. It works good on Flink, however I
cannot run it on SparkRunner.

Currently I run it locally, and get this exception:

17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest
offsets. ignored.
org.apache.kafka.common.errors.WakeupException
    at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:404)
    at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
    at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
    at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
    at
org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:374)
    at
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341)
    at
org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197)
    at
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
    at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
    at
com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.updateLatestOffsets(KafkaIO.java:1059)
    at
com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.access$3(KafkaIO.java:1055)
    at
com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(KafkaIO.java:966)
    at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has
already stopped! Dropping event
StreamingListenerOutputOperationCompleted(OutputOperationInfo(1484939618000
ms,0,foreachRDD at
UnboundedDataset.java:102,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
org.apache.beam.runners.spark.translation.streaming.UnboundedDataset.action(UnboundedDataset.java:102)
org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:164)
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFactory.java:78)
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:706)
org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:705)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:705)
org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159)
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81)
org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(1484939630152),None))


Here's my code:

SparkPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
        .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create();

options.setRunner(SparkRunner.class);
options.setStreaming(true);

Pipeline pipeline = Pipeline.create(options);

PCollection<String> inStream = pipeline.apply("source",

KafkaIO.read().withBootstrapServers(KAFKA_BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC))
                .withKeyCoder(ByteArrayCoder.of())
                .withValueCoder(StringUtf8Coder.of())
//                .withMaxNumRecords(5)
                .withoutMetadata()
                )...


Environment:

Apache-Beam : 0.4.0
Kafka: 0.9/0.10 (test both)
Spark: 1.6.3


I can run it by adding withMaxNumRecords(), however it's batch-onetime
then.

Any suggestion?

Thanks!
Mingmin

Reply via email to