I ran your example using the versions of kafka and spark you are
using, against a standalone cluster.  This is what I observed:

(in kafka working directory)

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -2
simple_logtest:2:0
simple_logtest:4:0
simple_logtest:1:0
simple_logtest:3:0
simple_logtest:0:0

bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list 'localhost:9092' --topic simple_logtest --time -1
simple_logtest:2:31
simple_logtest:4:31
simple_logtest:1:31
simple_logtest:3:31
simple_logtest:0:31

So in other words, there are 5 partitions, they all have messages in them

(in spark working directory)

bash-3.2$ ./bin/spark-submit --master
spark://Codys-MacBook-Pro.local:7077 --class
example.SimpleKafkaLoggingDriver
/private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar
localhost:9092 simple_logtest mygroup earliest


16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms

simple_logtest 3 offsets: 0 to 31
simple_logtest 0 offsets: 0 to 31
simple_logtest 1 offsets: 0 to 31
simple_logtest 2 offsets: 0 to 31
simple_logtest 4 offsets: 0 to 31

16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job
1479574025000 ms.0 from job set of time 1479574025000 ms
16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time
1479574025000 ms (execution: 0.005 s)
16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches:
16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata:
16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000 ms
16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job
1479574030000 ms.0 from job set of time 1479574030000 ms

simple_logtest 3 offsets: 31 to 31
simple_logtest 0 offsets: 31 to 31
simple_logtest 1 offsets: 31 to 31
simple_logtest 2 offsets: 31 to 31
simple_logtest 4 offsets: 31 to 31

So in other words, spark is indeed seeing offsets for each partition.


The results you posted look to me like there aren't any messages going
into the other partitions, which looks like a misbehaving producer.

On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri
<[email protected]> wrote:
> Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been
> struggling with this show stopper problem.
>
> When we run our drivers with auto.offset.reset=latest ingesting from a
> single kafka topic with 10 partitions, the driver reads correctly from all
> 10 partitions.
>
> However when we use auto.offset.reset=earliest, the driver will read only a
> single partition.
>
> When we turn on the debug logs, we sometimes see partitions being set to
> different offset configuration even though the consumer config correctly
> indicates auto.offset.reset=earliest.
>
>> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
>> to broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Sending ListOffsetRequest
>> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
>> to broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
>> from broker 10.102.20.12:9092 (id: 12 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 TRACE Received ListOffsetResponse
>> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
>> from broker 10.102.20.13:9092 (id: 13 rack: null)
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
>> (org.apache.kafka.clients.consumer.internals.Fetcher)
>
>
>
> I've enclosed below the completely stripped down trivial test driver that
> shows this behavior. We normally run with YARN 2.7.3 but have also tried
> running spark standalone mode which has the same behavior. Our drivers are
> normally java but we have tried the scala version which also has the same
> incorrect behavior. We have tried different LocationStrategies and partition
> assignment strategies all without success.  Any insight would be greatly
> appreciated.
>
> package com.xxxxx.labs.analytics.diagnostics.spark.drivers
>
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies
> import org.apache.spark.streaming.kafka010.ConsumerStrategies
>
>
> /**
>   *
>   * This driver is only for pulling data from the stream and logging to
> output just to isolate single partition bug
>   */
> object SimpleKafkaLoggingDriver {
>   def main(args: Array[String]) {
>     if (args.length != 4) {
>       System.err.println("Usage: SimpleTestDriver <broker bootstrap servers>
> <topic> <groupId> <offsetReset>")
>       System.exit(1)
>     }
>
>     val Array(brokers, topic, groupId, offsetReset) = args
>     val preferredHosts = LocationStrategies.PreferConsistent
>     val topics = List(topic)
>
>     val kafkaParams = Map(
>       "bootstrap.servers" -> brokers,
>       "key.deserializer" -> classOf[StringDeserializer],
>       "value.deserializer" -> classOf[StringDeserializer],
>       "group.id" -> groupId,
>       "auto.offset.reset" -> offsetReset,
>       "enable.auto.commit" -> (false: java.lang.Boolean)
>     )
>
>     val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_"
> +topic)
>     val streamingContext = new StreamingContext(sparkConf, Seconds(5))
>
>
>     val dstream = KafkaUtils.createDirectStream[String, String](
>       streamingContext,
>       preferredHosts,
>       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
>
>     dstream.foreachRDD { rdd =>
>       // Get the offset ranges in the RDD and log
>       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>       for (o <- offsetRanges) {
>         println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to
> ${o.untilOffset}")
>       }
>     }
>
>     streamingContext.start
>     streamingContext.awaitTermination()
>
>   }
>
> }
>
>
>
>> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
>>
>> auto.commit.interval.ms = 5000
>>
>> auto.offset.reset = earliest
>>
>> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
>>
>> check.crcs = true
>>
>> client.id =
>>
>> connections.max.idle.ms = 540000
>>
>> enable.auto.commit = false
>>
>> exclude.internal.topics = true
>>
>> fetch.max.bytes = 52428800
>>
>> fetch.max.wait.ms = 500
>>
>> fetch.min.bytes = 1
>>
>> group.id = simple_test_group
>>
>> heartbeat.interval.ms = 3000
>>
>> interceptor.classes = null
>>
>> key.deserializer = class
>> org.apache.kafka.common.serialization.StringDeserializer
>>
>> max.partition.fetch.bytes = 1048576
>>
>> max.poll.interval.ms = 300000
>>
>> max.poll.records = 500
>>
>> metadata.max.age.ms = 300000
>>
>> metric.reporters = []
>>
>> metrics.num.samples = 2
>>
>> metrics.sample.window.ms = 30000
>>
>> partition.assignment.strategy = [class
>> org.apache.kafka.clients.consumer.RangeAssignor]
>>
>> receive.buffer.bytes = 65536
>>
>> reconnect.backoff.ms = 50
>>
>> request.timeout.ms = 305000
>>
>> retry.backoff.ms = 100
>>
>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>
>> sasl.kerberos.min.time.before.relogin = 60000
>>
>> sasl.kerberos.service.name = null
>>
>> sasl.kerberos.ticket.renew.jitter = 0.05
>>
>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>
>> sasl.mechanism = GSSAPI
>>
>> security.protocol = PLAINTEXT
>>
>> send.buffer.bytes = 131072
>>
>> session.timeout.ms = 10000
>>
>> ssl.cipher.suites = null
>>
>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>
>> ssl.endpoint.identification.algorithm = null
>>
>> ssl.key.password = null
>>
>> ssl.keymanager.algorithm = SunX509
>>
>> ssl.keystore.location = null
>>
>> ssl.keystore.password = null
>>
>> ssl.keystore.type = JKS
>>
>> ssl.protocol = TLS
>>
>> ssl.provider = null
>>
>> ssl.secure.random.implementation = null
>>
>> ssl.trustmanager.algorithm = PKIX
>>
>> ssl.truststore.location = null
>>
>> ssl.truststore.password = null
>>
>> ssl.truststore.type = JKS
>>
>> value.deserializer = class
>> org.apache.kafka.common.serialization.StringDeserializer
>
>
>
> Below is the output of above driver for 5 partition topic.  Offsets always
> remain 0 for all but a single partition in this case 3
>
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
>
> simple_logtest 3 offsets: 1623531 to 1623531
>
>
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [email protected]

Reply via email to