Thanks for for following up! I've linked the relevant tickets to SPARK-18057 <https://issues.apache.org/jira/browse/SPARK-18057> and I targeted it for Spark 2.2.
On Sat, Jan 28, 2017 at 10:15 AM, Koert Kuipers <ko...@tresata.com> wrote: > there was also already an existing spark ticket for this: > SPARK-18779 <https://issues.apache.org/jira/browse/SPARK-18779> > > On Sat, Jan 28, 2017 at 1:13 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> it seems the bug is: >> https://issues.apache.org/jira/browse/KAFKA-4547 >> >> i would advise everyone not to use kafka-clients 0.10.0.2, 0.10.1.0 or >> 0.10.1.1 >> >> On Fri, Jan 27, 2017 at 3:56 PM, Koert Kuipers <ko...@tresata.com> wrote: >> >>> in case anyone else runs into this: >>> >>> the issue is that i was using kafka-clients 0.10.1.1 >>> >>> it works when i use kafka-clients 0.10.0.1 with spark structured >>> streaming >>> >>> my kafka server is 0.10.1.1 >>> >>> On Fri, Jan 27, 2017 at 1:24 PM, Koert Kuipers <ko...@tresata.com> >>> wrote: >>> >>>> i checked my topic. it has 5 partitions but all the data is written to >>>> a single partition: wikipedia-2 >>>> i turned on debug logging and i see this: >>>> >>>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Partitions assigned to >>>> consumer: [wikipedia-0, wikipedia-4, wikipedia-3, wikipedia-2, >>>> wikipedia-1]. Seeking to the end. >>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>>> partition wikipedia-0 >>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>>> partition wikipedia-4 >>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>>> partition wikipedia-3 >>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>>> partition wikipedia-2 >>>> 2017-01-27 13:02:50 DEBUG consumer.KafkaConsumer: Seeking to end of >>>> partition wikipedia-1 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-0 to latest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-0 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-0 to earliest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-0 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-4 to latest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-4 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-4 to earliest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-4 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-3 to latest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.AbstractCoordinator: Received >>>> successful heartbeat response for group spark-kafka-source-fac4f749-fd >>>> 56-4a32-82c7-e687aadf520b-1923704552-driver-0 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-3 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-3 to earliest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-3 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-2 to latest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=152908} for partition wikipedia-2 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-2 to earliest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-2 >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Resetting offset for >>>> partition wikipedia-1 to latest offset. >>>> 2017-01-27 13:02:50 DEBUG internals.Fetcher: Fetched {timestamp=-1, >>>> offset=0} for partition wikipedia-1 >>>> 2017-01-27 13:02:50 DEBUG kafka010.KafkaSource: Got latest offsets for >>>> partition : Map(wikipedia-1 -> 0, wikipedia-4 -> 0, wikipedia-2 -> 0, >>>> wikipedia-3 -> 0, wikipedia-0 -> 0) >>>> >>>> what is confusing to me is this: >>>> Resetting offset for partition wikipedia-2 to latest offset. >>>> Fetched {timestamp=-1, offset=152908} for partition wikipedia-2 >>>> Got latest offsets for partition : Map(wikipedia-1 -> 0, wikipedia-4 -> >>>> 0, wikipedia-2 -> 0, wikipedia-3 -> 0, wikipedia-0 -> 0) >>>> >>>> why does it find latest offset 152908 for wikipedia-2 but then sets >>>> latest offset to 0 for that partition? or am i misunderstanding? >>>> >>>> On Fri, Jan 27, 2017 at 1:19 PM, Koert Kuipers <ko...@tresata.com> >>>> wrote: >>>> >>>>> code: >>>>> val query = spark.readStream >>>>> .format("kafka") >>>>> .option("kafka.bootstrap.servers", "somenode:9092") >>>>> .option("subscribe", "wikipedia") >>>>> .load >>>>> .select(col("value") cast StringType) >>>>> .writeStream >>>>> .format("console") >>>>> .outputMode(OutputMode.Append) >>>>> .start() >>>>> >>>>> while (true) { >>>>> Thread.sleep(10000) >>>>> println(query.lastProgress) >>>>> } >>>>> } >>>>> >>>>> On Fri, Jan 27, 2017 at 5:34 AM, Alonso Isidoro Roman < >>>>> alons...@gmail.com> wrote: >>>>> >>>>>> lets see the code... >>>>>> >>>>>> Alonso Isidoro Roman >>>>>> [image: https://]about.me/alonso.isidoro.roman >>>>>> >>>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links> >>>>>> >>>>>> 2017-01-27 5:56 GMT+01:00 Koert Kuipers <ko...@tresata.com>: >>>>>> >>>>>>> my little program prints out query.lastProgress every 10 seconds, >>>>>>> and this is what it shows: >>>>>>> >>>>>>> { >>>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>>> "name" : "wiki", >>>>>>> "timestamp" : "2017-01-26T22:54:45.732Z", >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0, >>>>>>> "durationMs" : { >>>>>>> "getOffset" : 9, >>>>>>> "triggerExecution" : 10 >>>>>>> }, >>>>>>> "stateOperators" : [ ], >>>>>>> "sources" : [ { >>>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>>> "startOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "endOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0 >>>>>>> } ], >>>>>>> "sink" : { >>>>>>> "description" : "org.apache.spark.sql.executio >>>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>>> } >>>>>>> } >>>>>>> { >>>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>>> "name" : "wiki", >>>>>>> "timestamp" : "2017-01-26T22:54:55.745Z", >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0, >>>>>>> "durationMs" : { >>>>>>> "getOffset" : 5, >>>>>>> "triggerExecution" : 5 >>>>>>> }, >>>>>>> "stateOperators" : [ ], >>>>>>> "sources" : [ { >>>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>>> "startOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "endOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0 >>>>>>> } ], >>>>>>> "sink" : { >>>>>>> "description" : "org.apache.spark.sql.executio >>>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>>> } >>>>>>> } >>>>>>> { >>>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>>> "name" : "wiki", >>>>>>> "timestamp" : "2017-01-26T22:55:05.748Z", >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0, >>>>>>> "durationMs" : { >>>>>>> "getOffset" : 5, >>>>>>> "triggerExecution" : 5 >>>>>>> }, >>>>>>> "stateOperators" : [ ], >>>>>>> "sources" : [ { >>>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>>> "startOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "endOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0 >>>>>>> } ], >>>>>>> "sink" : { >>>>>>> "description" : "org.apache.spark.sql.executio >>>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>>> } >>>>>>> } >>>>>>> { >>>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>>> "name" : "wiki", >>>>>>> "timestamp" : "2017-01-26T22:55:15.758Z", >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0, >>>>>>> "durationMs" : { >>>>>>> "getOffset" : 4, >>>>>>> "triggerExecution" : 4 >>>>>>> }, >>>>>>> "stateOperators" : [ ], >>>>>>> "sources" : [ { >>>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>>> "startOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "endOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0 >>>>>>> } ], >>>>>>> "sink" : { >>>>>>> "description" : "org.apache.spark.sql.executio >>>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>>> } >>>>>>> } >>>>>>> { >>>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>>> "name" : "wiki", >>>>>>> "timestamp" : "2017-01-26T22:55:25.760Z", >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0, >>>>>>> "durationMs" : { >>>>>>> "getOffset" : 4, >>>>>>> "triggerExecution" : 4 >>>>>>> }, >>>>>>> "stateOperators" : [ ], >>>>>>> "sources" : [ { >>>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>>> "startOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "endOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0 >>>>>>> } ], >>>>>>> "sink" : { >>>>>>> "description" : "org.apache.spark.sql.executio >>>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>>> } >>>>>>> } >>>>>>> { >>>>>>> "id" : "4cc09da1-c002-4fa7-86dd-ba03018e53a0", >>>>>>> "runId" : "bf8cdde2-44d0-4bff-ad90-7cbac0432099", >>>>>>> "name" : "wiki", >>>>>>> "timestamp" : "2017-01-26T22:55:35.766Z", >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0, >>>>>>> "durationMs" : { >>>>>>> "getOffset" : 4, >>>>>>> "triggerExecution" : 4 >>>>>>> }, >>>>>>> "stateOperators" : [ ], >>>>>>> "sources" : [ { >>>>>>> "description" : "KafkaSource[Subscribe[wikipedia]]", >>>>>>> "startOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "endOffset" : { >>>>>>> "wikipedia" : { >>>>>>> "2" : 0, >>>>>>> "4" : 0, >>>>>>> "1" : 0, >>>>>>> "3" : 0, >>>>>>> "0" : 0 >>>>>>> } >>>>>>> }, >>>>>>> "numInputRows" : 0, >>>>>>> "inputRowsPerSecond" : 0.0, >>>>>>> "processedRowsPerSecond" : 0.0 >>>>>>> } ], >>>>>>> "sink" : { >>>>>>> "description" : "org.apache.spark.sql.executio >>>>>>> n.streaming.ConsoleSink@4818d2d9" >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> On Thu, Jan 26, 2017 at 10:33 PM, Koert Kuipers <ko...@tresata.com> >>>>>>> wrote: >>>>>>> >>>>>>>> hey, >>>>>>>> i am just getting started with kafka + spark structured streaming. >>>>>>>> so this is probably a pretty dumb mistake. >>>>>>>> >>>>>>>> i wrote a little program in spark to read messages from a kafka >>>>>>>> topic and display them in the console, using the kafka source and >>>>>>>> console >>>>>>>> sink. i run it it in spark local mode. >>>>>>>> >>>>>>>> i hooked it up to a test topic that i send messages to using the >>>>>>>> kafka console producer, and everything works great. i type a message >>>>>>>> in the >>>>>>>> console producer, and it pops up in my spark program. very neat! >>>>>>>> >>>>>>>> next i point it to another topic instead on which a kafka-connect >>>>>>>> program is writing lots of irc messages. i can see kafka connect to the >>>>>>>> topic successfully, the partitions are discovered etc., and then... >>>>>>>> nothing. it just keeps stuck at offsets 0 for all partitions. at the >>>>>>>> same >>>>>>>> time in another terminal i can see messages coming in just fine using >>>>>>>> the >>>>>>>> kafka console consumer. >>>>>>>> >>>>>>>> i dont get it. why doesnt kafka want to consume from this topic in >>>>>>>> spark structured streaming? >>>>>>>> >>>>>>>> thanks! koert >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >