I ran your example using the versions of kafka and spark you are using, against a standalone cluster. This is what I observed:
(in kafka working directory) bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 'localhost:9092' --topic simple_logtest --time -2 simple_logtest:2:0 simple_logtest:4:0 simple_logtest:1:0 simple_logtest:3:0 simple_logtest:0:0 bash-3.2$ ./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 'localhost:9092' --topic simple_logtest --time -1 simple_logtest:2:31 simple_logtest:4:31 simple_logtest:1:31 simple_logtest:3:31 simple_logtest:0:31 So in other words, there are 5 partitions, they all have messages in them (in spark working directory) bash-3.2$ ./bin/spark-submit --master spark://Codys-MacBook-Pro.local:7077 --class example.SimpleKafkaLoggingDriver /private/var/tmp/kafka-bug-report/target/scala-2.11/kafka-example-assembly-2.0.0.jar localhost:9092 simple_logtest mygroup earliest 16/11/19 10:47:05 INFO JobScheduler: Starting job streaming job 1479574025000 ms.0 from job set of time 1479574025000 ms simple_logtest 3 offsets: 0 to 31 simple_logtest 0 offsets: 0 to 31 simple_logtest 1 offsets: 0 to 31 simple_logtest 2 offsets: 0 to 31 simple_logtest 4 offsets: 0 to 31 16/11/19 10:47:05 INFO JobScheduler: Finished job streaming job 1479574025000 ms.0 from job set of time 1479574025000 ms 16/11/19 10:47:05 INFO JobScheduler: Total delay: 0.172 s for time 1479574025000 ms (execution: 0.005 s) 16/11/19 10:47:05 INFO ReceivedBlockTracker: Deleting batches: 16/11/19 10:47:05 INFO InputInfoTracker: remove old batch metadata: 16/11/19 10:47:10 INFO JobScheduler: Added jobs for time 1479574030000 ms 16/11/19 10:47:10 INFO JobScheduler: Starting job streaming job 1479574030000 ms.0 from job set of time 1479574030000 ms simple_logtest 3 offsets: 31 to 31 simple_logtest 0 offsets: 31 to 31 simple_logtest 1 offsets: 31 to 31 simple_logtest 2 offsets: 31 to 31 simple_logtest 4 offsets: 31 to 31 So in other words, spark is indeed seeing offsets for each partition. The results you posted look to me like there aren't any messages going into the other partitions, which looks like a misbehaving producer. On Thu, Nov 17, 2016 at 5:58 PM, Hster Geguri <[email protected]> wrote: > Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have been > struggling with this show stopper problem. > > When we run our drivers with auto.offset.reset=latest ingesting from a > single kafka topic with 10 partitions, the driver reads correctly from all > 10 partitions. > > However when we use auto.offset.reset=earliest, the driver will read only a > single partition. > > When we turn on the debug logs, we sometimes see partitions being set to > different offset configuration even though the consumer config correctly > indicates auto.offset.reset=earliest. > >> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset. >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset. >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> 8 TRACE Sending ListOffsetRequest >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]} >> to broker 10.102.20.12:9092 (id: 12 rack: null) >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> 9 TRACE Sending ListOffsetRequest >> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]} >> to broker 10.102.20.13:9092 (id: 13 rack: null) >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> 8 TRACE Received ListOffsetResponse >> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]} >> from broker 10.102.20.12:9092 (id: 12 rack: null) >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> 9 TRACE Received ListOffsetResponse >> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]} >> from broker 10.102.20.13:9092 (id: 13 rack: null) >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8 >> (org.apache.kafka.clients.consumer.internals.Fetcher) >> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9 >> (org.apache.kafka.clients.consumer.internals.Fetcher) > > > > I've enclosed below the completely stripped down trivial test driver that > shows this behavior. We normally run with YARN 2.7.3 but have also tried > running spark standalone mode which has the same behavior. Our drivers are > normally java but we have tried the scala version which also has the same > incorrect behavior. We have tried different LocationStrategies and partition > assignment strategies all without success. Any insight would be greatly > appreciated. > > package com.xxxxx.labs.analytics.diagnostics.spark.drivers > > import org.apache.kafka.common.serialization.StringDeserializer > import org.apache.spark.SparkConf > import org.apache.spark.streaming.{Seconds, StreamingContext} > import org.apache.spark.streaming.kafka010._ > import org.apache.spark.streaming.kafka010.LocationStrategies > import org.apache.spark.streaming.kafka010.ConsumerStrategies > > > /** > * > * This driver is only for pulling data from the stream and logging to > output just to isolate single partition bug > */ > object SimpleKafkaLoggingDriver { > def main(args: Array[String]) { > if (args.length != 4) { > System.err.println("Usage: SimpleTestDriver <broker bootstrap servers> > <topic> <groupId> <offsetReset>") > System.exit(1) > } > > val Array(brokers, topic, groupId, offsetReset) = args > val preferredHosts = LocationStrategies.PreferConsistent > val topics = List(topic) > > val kafkaParams = Map( > "bootstrap.servers" -> brokers, > "key.deserializer" -> classOf[StringDeserializer], > "value.deserializer" -> classOf[StringDeserializer], > "group.id" -> groupId, > "auto.offset.reset" -> offsetReset, > "enable.auto.commit" -> (false: java.lang.Boolean) > ) > > val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" > +topic) > val streamingContext = new StreamingContext(sparkConf, Seconds(5)) > > > val dstream = KafkaUtils.createDirectStream[String, String]( > streamingContext, > preferredHosts, > ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) > > dstream.foreachRDD { rdd => > // Get the offset ranges in the RDD and log > val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > for (o <- offsetRanges) { > println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to > ${o.untilOffset}") > } > } > > streamingContext.start > streamingContext.awaitTermination() > > } > > } > > > >> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values: >> >> auto.commit.interval.ms = 5000 >> >> auto.offset.reset = earliest >> >> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092] >> >> check.crcs = true >> >> client.id = >> >> connections.max.idle.ms = 540000 >> >> enable.auto.commit = false >> >> exclude.internal.topics = true >> >> fetch.max.bytes = 52428800 >> >> fetch.max.wait.ms = 500 >> >> fetch.min.bytes = 1 >> >> group.id = simple_test_group >> >> heartbeat.interval.ms = 3000 >> >> interceptor.classes = null >> >> key.deserializer = class >> org.apache.kafka.common.serialization.StringDeserializer >> >> max.partition.fetch.bytes = 1048576 >> >> max.poll.interval.ms = 300000 >> >> max.poll.records = 500 >> >> metadata.max.age.ms = 300000 >> >> metric.reporters = [] >> >> metrics.num.samples = 2 >> >> metrics.sample.window.ms = 30000 >> >> partition.assignment.strategy = [class >> org.apache.kafka.clients.consumer.RangeAssignor] >> >> receive.buffer.bytes = 65536 >> >> reconnect.backoff.ms = 50 >> >> request.timeout.ms = 305000 >> >> retry.backoff.ms = 100 >> >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> >> sasl.kerberos.min.time.before.relogin = 60000 >> >> sasl.kerberos.service.name = null >> >> sasl.kerberos.ticket.renew.jitter = 0.05 >> >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> >> sasl.mechanism = GSSAPI >> >> security.protocol = PLAINTEXT >> >> send.buffer.bytes = 131072 >> >> session.timeout.ms = 10000 >> >> ssl.cipher.suites = null >> >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> >> ssl.endpoint.identification.algorithm = null >> >> ssl.key.password = null >> >> ssl.keymanager.algorithm = SunX509 >> >> ssl.keystore.location = null >> >> ssl.keystore.password = null >> >> ssl.keystore.type = JKS >> >> ssl.protocol = TLS >> >> ssl.provider = null >> >> ssl.secure.random.implementation = null >> >> ssl.trustmanager.algorithm = PKIX >> >> ssl.truststore.location = null >> >> ssl.truststore.password = null >> >> ssl.truststore.type = JKS >> >> value.deserializer = class >> org.apache.kafka.common.serialization.StringDeserializer > > > > Below is the output of above driver for 5 partition topic. Offsets always > remain 0 for all but a single partition in this case 3 > > simple_logtest 3 offsets: 1623531 to 1623531 > simple_logtest 0 offsets: 0 to 0 > simple_logtest 1 offsets: 0 to 0 > simple_logtest 2 offsets: 0 to 0 > simple_logtest 4 offsets: 0 to 0 > simple_logtest 3 offsets: 1623531 to 1623531 > simple_logtest 0 offsets: 0 to 0 > simple_logtest 1 offsets: 0 to 0 > simple_logtest 2 offsets: 0 to 0 > simple_logtest 4 offsets: 0 to 0 > > simple_logtest 3 offsets: 1623531 to 1623531 > > > > --------------------------------------------------------------------- To unsubscribe e-mail: [email protected]
