it seems the bug is: https://issues.apache.org/jira/browse/KAFKA-4547
i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or 0.10.1.1 On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <ko...@tresata.com> wrote: > in case anyone else runs into this: > > the issue is that i was using kafka-clients 0.10.1.1 > > it works when i use kafka-clients 0.10.0.1 with spark structured streaming > > my kafka server is 0.10.1.1 > > On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> i checked my topic. it has 5 partitions but all the data is written to a >> single partition: wikipedia-2 >> i turned on debug logging and i see this: >> >> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to >> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2, >> wikipedia-1]. Seeking to the end. >> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >> partition wikipedia-0 >> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >> partition wikipedia-4 >> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >> partition wikipedia-3 >> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >> partition wikipedia-2 >> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >> partition wikipedia-1 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-0 to latest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-0 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-0 to earliest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-0 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-4 to latest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-4 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-4 to earliest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-4 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-3 to latest offset. >> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received >> successful heartbeat response for group spark-kafka-source-fac4f749-fd >> 56-4a32-82c7-e687aadf520b-1923704552-driver-0 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-3 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-3 to earliest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-3 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-2 to latest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=152908} for partition wikipedia-2 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-2 to earliest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-2 >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >> partition wikipedia-1 to latest offset. >> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >> offset=0} for partition wikipedia-1 >> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for >> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0, >> wikipedia-3 -> 0, wikipedia-0 -> 0) >> >> what is confusing to me is this: >> Resetting offset for partition wikipedia-2 to latest offset. >> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2 >> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> >> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0) >> >> why does it find latest offset 152908 for wikipedia-2 but then sets >> latest offset to 0 for that partition? or am i misunderstanding? >> >> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> code: >>> val query = spark.readStream >>> .format("kafka") >>> .option("kafka.bootstrap.servers", "somenode:9092") >>> .option("subscribe", "wikipedia") >>> .load >>> .select(col("value") cast StringType) >>> .writeStream >>> .format("console") >>> .outputMode(OutputMode.Append) >>> .start() >>> >>> while (true) { >>> Thread.sleep(10000) >>> println(query.lastProgress) >>> } >>> } >>> >>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman < >>> alons...@gmail.com> wrote: >>> >>>> lets see the code... >>>> >>>> Alonso Isidoro Roman >>>> [image: https://]about.me/alonso.isidoro.roman >>>> >>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >>>> >>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>: >>>> >>>>> my little program prints out query.lastProgress every 10 seconds, and >>>>> this is what it shows: >>>>> >>>>> { >>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>> "name" : "wiki", >>>>> "timestamp" : "2017-01-26T22:54:45.732Z", >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0, >>>>> "durationMs" : { >>>>> "getOffset" : 9, >>>>> "triggerExecution" : 10 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>> "startOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "endOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0 >>>>> } ], >>>>> "sink" : { >>>>> "description" : "org.apache.spark.sql.executio >>>>> n.streaming.ConsoleSink@4818d2d9" >>>>> } >>>>> } >>>>> { >>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>> "name" : "wiki", >>>>> "timestamp" : "2017-01-26T22:54:55.745Z", >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0, >>>>> "durationMs" : { >>>>> "getOffset" : 5, >>>>> "triggerExecution" : 5 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>> "startOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "endOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0 >>>>> } ], >>>>> "sink" : { >>>>> "description" : "org.apache.spark.sql.executio >>>>> n.streaming.ConsoleSink@4818d2d9" >>>>> } >>>>> } >>>>> { >>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>> "name" : "wiki", >>>>> "timestamp" : "2017-01-26T22:55:05.748Z", >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0, >>>>> "durationMs" : { >>>>> "getOffset" : 5, >>>>> "triggerExecution" : 5 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>> "startOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "endOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0 >>>>> } ], >>>>> "sink" : { >>>>> "description" : "org.apache.spark.sql.executio >>>>> n.streaming.ConsoleSink@4818d2d9" >>>>> } >>>>> } >>>>> { >>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>> "name" : "wiki", >>>>> "timestamp" : "2017-01-26T22:55:15.758Z", >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0, >>>>> "durationMs" : { >>>>> "getOffset" : 4, >>>>> "triggerExecution" : 4 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>> "startOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "endOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0 >>>>> } ], >>>>> "sink" : { >>>>> "description" : "org.apache.spark.sql.executio >>>>> n.streaming.ConsoleSink@4818d2d9" >>>>> } >>>>> } >>>>> { >>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>> "name" : "wiki", >>>>> "timestamp" : "2017-01-26T22:55:25.760Z", >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0, >>>>> "durationMs" : { >>>>> "getOffset" : 4, >>>>> "triggerExecution" : 4 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>> "startOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "endOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0 >>>>> } ], >>>>> "sink" : { >>>>> "description" : "org.apache.spark.sql.executio >>>>> n.streaming.ConsoleSink@4818d2d9" >>>>> } >>>>> } >>>>> { >>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>> "name" : "wiki", >>>>> "timestamp" : "2017-01-26T22:55:35.766Z", >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0, >>>>> "durationMs" : { >>>>> "getOffset" : 4, >>>>> "triggerExecution" : 4 >>>>> }, >>>>> "stateOperators" : [ ], >>>>> "sources" : [ { >>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>> "startOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "endOffset" : { >>>>> "wikipedia" : { >>>>> "2" : 0, >>>>> "4" : 0, >>>>> "1" : 0, >>>>> "3" : 0, >>>>> "0" : 0 >>>>> } >>>>> }, >>>>> "numInputRows" : 0, >>>>> "inputRowsPerSecond" : 0.0, >>>>> "processedRowsPerSecond" : 0.0 >>>>> } ], >>>>> "sink" : { >>>>> "description" : "org.apache.spark.sql.executio >>>>> n.streaming.ConsoleSink@4818d2d9" >>>>> } >>>>> } >>>>> >>>>> >>>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com> >>>>> wrote: >>>>> >>>>>> hey, >>>>>> i am just getting started with kafka + spark structured streaming. so >>>>>> this is probably a pretty dumb mistake. >>>>>> >>>>>> i wrote a little program in spark to read messages from a kafka topic >>>>>> and display them in the console, using the kafka source and console >>>>>> sink. i >>>>>> run it it in spark local mode. >>>>>> >>>>>> i hooked it up to a test topic that i send messages to using the >>>>>> kafka console producer, and everything works great. i type a message in >>>>>> the >>>>>> console producer, and it pops up in my spark program. very neat! >>>>>> >>>>>> next i point it to another topic instead on which a kafka-connect >>>>>> program is writing lots of irc messages. i can see kafka connect to the >>>>>> topic successfully, the partitions are discovered etc., and then... >>>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the same >>>>>> time in another terminal i can see messages coming in just fine using the >>>>>> kafka console consumer. >>>>>> >>>>>> i dont get it. why doesnt kafka want to consume from this topic in >>>>>> spark structured streaming? >>>>>> >>>>>> thanks! koert >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >