Hi Sumeet, If your data volume is small and you are happy to read them in batch mode, one workaround of using ReadFromKafka is to specify max_num_records or max_read_time: https://github.com/apache/beam/blob/8b759d1eb0dd749b5c228bc1366a1d795f2dda6e/sdks/python/apache_beam/io/kafka.py#L133-L134 .
On Wed, Mar 17, 2021 at 6:40 PM Ahmet Altay <[email protected]> wrote: > > > On Wed, Mar 17, 2021 at 10:23 AM Chamikara Jayalath <[email protected]> > wrote: > >> IIUC, currently Splittable DoFn (source framework) does not work for >> portable runners in streaming mode due to the issue Boyuan mentioned. >> >> On Tue, Mar 16, 2021 at 8:35 PM Sumeet Malhotra < >> [email protected]> wrote: >> >>> Thanks Cham. In the python version, I do specify the streaming option as >>> follows (not on the command line though): >>> >>> pipeline_options = PipelineOptions(pipeline_args, >>> save_main_session=True, streaming=True) >>> >>> Regarding running portable pipelines, just to confirm, what you are >>> saying is that currently the only way to execute this is in Java then until >>> the issue you created is resolved? >>> >> >> Yes, I think Java worked since it did not use portable Spark/Flink but a >> cross-language transform would require this. >> >> Thanks, >> Cham >> >> >>> >>> Thanks, >>> Sumeet >>> >>> >>> On Wed, Mar 17, 2021 at 5:38 AM Boyuan Zhang <[email protected]> wrote: >>> >>>> Hi Sumeet, >>>> >>>> After double checking the current support status. the root cause is >>>> that when you are using cross-language pipelines, you are actually having >>>> pipelines running in the portable way[1]. Currently we haven't supported >>>> processing unbounded source on Flink over portable execution well. I have >>>> filed https://issues.apache.org/jira/browse/BEAM-11998 to track the >>>> progress. >>>> >>>> [1] https://s.apache.org/beam-fn-api >>>> >>>> >>>> On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> And one more question, did you launch your pipeline with streaming=True >>>>> pipeline options? I think you need to use --streaming=True to have >>>>> unbounded source working properly. >>>>> >>>>> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Sumeet, >>>>>> >>>>>> Which Beam version are you using for your pipeline? >>>>>> >>>>>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I don't believe Fn API DirectRunner supports streaming yet (I might >>>>>>> be wrong). I can confirm that this works for Dataflow. >>>>>>> >>>>>> > You are right about DirectRunner. And this is tracked in > https://issues.apache.org/jira/browse/BEAM-7514. (/cc +Pablo Estrada > <[email protected]>) > > >> >>>>>>> Thanks, >>>>>>> Cham >>>>>>> >>>>>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Thanks Cham! But I don't think this is Flink specific. I have >>>>>>>> observed similar behaviour with DirectRunner as well BTW. >>>>>>>> >>>>>>>> ..Sumeet >>>>>>>> >>>>>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> I'm not too familiar with Flink but it seems like, for streaming >>>>>>>>> pipelines, messages from Kafka/SDF read do not get pushed to >>>>>>>>> subsequent >>>>>>>>> steps for some reason. >>>>>>>>> * X-lang Bounded read with Flink seems to be fine. >>>>>>>>> * X-lang Kafka sink and with Flink to be fine. >>>>>>>>> >>>>>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for >>>>>>>>> tracking. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Cham >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Cham, >>>>>>>>>> >>>>>>>>>> Do you have pointers on what might be going on? Or something else >>>>>>>>>> I can try? I had posted the same on StackOverflow [1], it seems that >>>>>>>>>> I'm >>>>>>>>>> not the only one seeing this issue at the moment. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Sumeet >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Took me some time to setup the Java test (using Java after more >>>>>>>>>>> than a decade!), but yes a similar pipeline with KafkaIO and Flink >>>>>>>>>>> seems to >>>>>>>>>>> work fine. >>>>>>>>>>> >>>>>>>>>>> Here's the relevant Java code. The only difference from the >>>>>>>>>>> Python version is that I had to extract the KV from the KafkaRecord >>>>>>>>>>> object >>>>>>>>>>> and construct a PCollection<KV> explicitly before writing to the >>>>>>>>>>> output >>>>>>>>>>> topic. >>>>>>>>>>> >>>>>>>>>>> ~~~~~~~~ >>>>>>>>>>> package org.apache.beam.kafka.test; >>>>>>>>>>> >>>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>>>>>>>>> import org.apache.beam.sdk.options.Default; >>>>>>>>>>> import org.apache.beam.sdk.options.Description; >>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>>> import org.apache.beam.sdk.transforms.*; >>>>>>>>>>> import org.apache.beam.sdk.values.KV; >>>>>>>>>>> import org.apache.beam.sdk.values.PCollection; >>>>>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>>>>>>>>> >>>>>>>>>>> public class KafkaTest { >>>>>>>>>>> >>>>>>>>>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // >>>>>>>>>>> Default bootstrap kafka servers >>>>>>>>>>> static final String INPUT_TOPIC = "in_topic"; // Default input >>>>>>>>>>> kafka topic name >>>>>>>>>>> static final String OUTPUT_TOPIC = "out_topic"; // Default >>>>>>>>>>> output kafka topic name >>>>>>>>>>> >>>>>>>>>>> /** Specific pipeline options. */ >>>>>>>>>>> public interface KafkaTestOptions extends PipelineOptions { >>>>>>>>>>> @Description("Kafka bootstrap servers") >>>>>>>>>>> @Default.String(BOOTSTRAP_SERVERS) >>>>>>>>>>> String getBootstrap(); >>>>>>>>>>> >>>>>>>>>>> void setBootstrap(String value); >>>>>>>>>>> >>>>>>>>>>> @Description("Kafka input topic name") >>>>>>>>>>> @Default.String(INPUT_TOPIC) >>>>>>>>>>> String getInputTopic(); >>>>>>>>>>> >>>>>>>>>>> void setInputTopic(String value); >>>>>>>>>>> >>>>>>>>>>> @Description("Kafka output topic name") >>>>>>>>>>> @Default.String(OUTPUT_TOPIC) >>>>>>>>>>> String getOutputTopic(); >>>>>>>>>>> >>>>>>>>>>> void setOutputTopic(String value); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> public static final void main(String[] args) throws Exception { >>>>>>>>>>> final KafkaTestOptions options = >>>>>>>>>>> >>>>>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>>>>>>>>> >>>>>>>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>>>>>>> pipeline >>>>>>>>>>> .apply( >>>>>>>>>>> "ReadFromKafka", >>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>>>>> .withTopic(options.getInputTopic()) >>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>>>>>> .apply( >>>>>>>>>>> "PrepareForWriting", >>>>>>>>>>> ParDo.of( >>>>>>>>>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>>>>>>>>> String>>() { >>>>>>>>>>> @ProcessElement >>>>>>>>>>> public void processElement(ProcessContext c) >>>>>>>>>>> throws Exception { >>>>>>>>>>> c.output(KV.of(c.element().getKV().getKey(), >>>>>>>>>>> c.element().getKV().getValue())); >>>>>>>>>>> } >>>>>>>>>>> })) >>>>>>>>>>> .apply( >>>>>>>>>>> "WriteToKafka", >>>>>>>>>>> KafkaIO.<String, String>write() >>>>>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>>>>> .withTopic(options.getOutputTopic()) >>>>>>>>>>> >>>>>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>>>>>>>>> >>>>>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>>>>>>>>> >>>>>>>>>>> pipeline.run(); >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> ~~~~~~~~~ >>>>>>>>>>> >>>>>>>>>>> I'm firing the Java version as follows: >>>>>>>>>>> >>>>>>>>>>> $ mvn exec:java >>>>>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest >>>>>>>>>>> -Pflink-runner >>>>>>>>>>> -Dexec.args="--runner=FlinkRunner" >>>>>>>>>>> >>>>>>>>>>> And I can see in real time, that as I publish records to the >>>>>>>>>>> in_topic, the out_topic is able to receive them on a continuous >>>>>>>>>>> basis. >>>>>>>>>>> >>>>>>>>>>> I hope this helps narrow down the issue. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Sumeet >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Are you able to run a similar Java streaming pipeline using >>>>>>>>>>>> KafkaIO and Flink ? (without x-lang) >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Cham >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Cham! >>>>>>>>>>>>> >>>>>>>>>>>>> So finally I was able to get partial success. Since I had >>>>>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>>>>>>>>> max_num_records=3 to see if it can read all existing records, as >>>>>>>>>>>>> follows: >>>>>>>>>>>>> >>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>> _ = ( >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>>>>> consumer_config={ >>>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>>>>> 'auto.offset.reset': 'earliest'}, >>>>>>>>>>>>> topics=[in_topic], >>>>>>>>>>>>> max_num_records=3) >>>>>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>>>>> producer_config={ >>>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>>>>>>>>> topic=out_topic)) >>>>>>>>>>>>> >>>>>>>>>>>>> I was able to see all 3 records being read, and written >>>>>>>>>>>>> successfully to the out_topic as well. So, it appears that there >>>>>>>>>>>>> might be >>>>>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there >>>>>>>>>>>>> any >>>>>>>>>>>>> setting that I might be missing? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Sumeet >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey Cham! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Appreciate the response. I tried out your suggestions >>>>>>>>>>>>>> (details below), but I still don't see any data being consumed >>>>>>>>>>>>>> or written >>>>>>>>>>>>>> back to Kafka (as per your suggestion). I'm also providing >>>>>>>>>>>>>> additional >>>>>>>>>>>>>> details/context that might help narrow down the issue. Apologies >>>>>>>>>>>>>> for being >>>>>>>>>>>>>> a bit verbose from hereon! >>>>>>>>>>>>>> >>>>>>>>>>>>>> First, here's what my pipeline code looks like now: >>>>>>>>>>>>>> >>>>>>>>>>>>>> ~~~~~~ >>>>>>>>>>>>>> import apache_beam as beam >>>>>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>>>>>>>>> from apache_beam.options.pipeline_options import >>>>>>>>>>>>>> PipelineOptions >>>>>>>>>>>>>> >>>>>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, >>>>>>>>>>>>>> pipeline_args): >>>>>>>>>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>>>>>>>>> save_main_session=True, streaming=True) >>>>>>>>>>>>>> >>>>>>>>>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>>>>>>>>> in_topic=%s out_topic=%s', >>>>>>>>>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>>>>>>>>> >>>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>>> _ = ( >>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>>>>>> consumer_config={ >>>>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>>>>>> 'auto.offset.reset': 'earliest' >>>>>>>>>>>>>> }, >>>>>>>>>>>>>> topics=[in_topic]) >>>>>>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>>>>>> producer_config={ >>>>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>>>>>>>>> }, >>>>>>>>>>>>>> topic=out_topic)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> if __name__ == '__main__': >>>>>>>>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>>>>>>>> import argparse >>>>>>>>>>>>>> >>>>>>>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>>>> '--bootstrap_servers', >>>>>>>>>>>>>> dest='bootstrap_servers', >>>>>>>>>>>>>> required=True, >>>>>>>>>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>>>> '--in_topic', >>>>>>>>>>>>>> dest='in_topic', >>>>>>>>>>>>>> required=True, >>>>>>>>>>>>>> help='Kafka topic to read data from') >>>>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>>>> '--out_topic', >>>>>>>>>>>>>> dest='out_topic', >>>>>>>>>>>>>> required=True, >>>>>>>>>>>>>> help='Kafka topic to write data to') >>>>>>>>>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>>>>>>>>> >>>>>>>>>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>>>>>>>>> known_args.out_topic, pipeline_args) >>>>>>>>>>>>>> ~~~~~ >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm firing this pipeline as follows: >>>>>>>>>>>>>> >>>>>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>>>>>>>>> >>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>>>>>>>>> v1 >>>>>>>>>>>>>> v2 >>>>>>>>>>>>>> v3 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to >>>>>>>>>>>>>> read records from offset 0, but then seeks to the latest offset >>>>>>>>>>>>>> 3 without >>>>>>>>>>>>>> processing the records. I don't see any data written to >>>>>>>>>>>>>> out_topic. I >>>>>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>>>>>>>>> >>>>>>>>>>>>>> INFO:root:Starting data pipeline. >>>>>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic >>>>>>>>>>>>>> out_topic=out_topic >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions >>>>>>>>>>>>>> assigned to split 0 (total 1): in_topic-0' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>>>>>>>>> in_topic-0' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>>>>>>>>> in_topic-0 to offset 0.' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: >>>>>>>>>>>>>> reading from in_topic-0 starting at offset 0' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>>>>>>>>> partition(s): in_topic-0' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to >>>>>>>>>>>>>> LATEST offset >>>>>>>>>>>>>> of partition in_topic-0' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting >>>>>>>>>>>>>> offset for >>>>>>>>>>>>>> partition in_topic-0 to offset 3.' >>>>>>>>>>>>>> >>>>>>>>>>>>>> Additionally, the logs also emit complete consumer and >>>>>>>>>>>>>> producer configs. I'm dumping them here, in case that helps: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Consumer Config: >>>>>>>>>>>>>> >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: >>>>>>>>>>>>>> ConsumerConfig values:' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>>>>>>>>> = true' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tauto.commit.interval.ms = 5000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset >>>>>>>>>>>>>> = earliest' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers >>>>>>>>>>>>>> = [localhost:29092]' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = >>>>>>>>>>>>>> true' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup >>>>>>>>>>>>>> = default' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tdefault.api.timeout.ms = 60000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit >>>>>>>>>>>>>> = false' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>>>>>>>>> = true' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes >>>>>>>>>>>>>> = 52428800' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tfetch.max.wait.ms = 500' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes >>>>>>>>>>>>>> = 1' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>>>>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tgroup.instance.id = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> theartbeat.interval.ms = 3000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>>>>> = []' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>>>>>>>>> = true' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level >>>>>>>>>>>>>> = read_uncommitted' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer >>>>>>>>>>>>>> = class >>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>>>>>>>>> = 1048576' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tmax.poll.interval.ms = 300000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records >>>>>>>>>>>>>> = 500' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tmetadata.max.age.ms = 300000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters >>>>>>>>>>>>>> = []' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>>>>> = 2' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>>>>> = INFO' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>>>>> = 65536' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> trequest.timeout.ms = 30000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms >>>>>>>>>>>>>> = 100' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>>>>> = 60000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>>>>> = 300' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>>>>> = 60' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>>>>> GSSAPI' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol >>>>>>>>>>>>>> = PLAINTEXT' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes >>>>>>>>>>>>>> = 131072' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tsession.timeout.ms = 10000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>>>>> = https' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>>>>> = SunX509' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type >>>>>>>>>>>>>> = JKS' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = >>>>>>>>>>>>>> TLS' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = >>>>>>>>>>>>>> null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>>>>> = PKIX' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>>>>> = JKS' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer >>>>>>>>>>>>>> = class >>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>>>>> >>>>>>>>>>>>>> Producer Config: >>>>>>>>>>>>>> >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: >>>>>>>>>>>>>> ProducerConfig values:' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = >>>>>>>>>>>>>> 16384' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers >>>>>>>>>>>>>> = [localhost:29092]' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>>>>>>>>> 33554432' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup >>>>>>>>>>>>>> = default' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type >>>>>>>>>>>>>> = none' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tdelivery.timeout.ms = 120000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence >>>>>>>>>>>>>> = false' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>>>>> = []' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = >>>>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = >>>>>>>>>>>>>> 60000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>>>>>>>>> = 5' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size >>>>>>>>>>>>>> = 1048576' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tmetadata.max.age.ms = 300000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters >>>>>>>>>>>>>> = []' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>>>>> = 2' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>>>>> = INFO' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class >>>>>>>>>>>>>> = class >>>>>>>>>>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>>>>> = 32768' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> trequest.timeout.ms = 30000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms >>>>>>>>>>>>>> = 100' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>>>>> = 60000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>>>>> = 300' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>>>>> = 60' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>>>>> GSSAPI' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol >>>>>>>>>>>>>> = PLAINTEXT' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes >>>>>>>>>>>>>> = 131072' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>>>>> = https' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>>>>> = SunX509' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type >>>>>>>>>>>>>> = JKS' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = >>>>>>>>>>>>>> TLS' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = >>>>>>>>>>>>>> null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>>>>> = PKIX' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>>>>> = JKS' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>>> ttransaction.timeout.ms = 60000' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id >>>>>>>>>>>>>> = null' >>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer >>>>>>>>>>>>>> = class >>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Apologies again for dumping almost everything here :-) Any >>>>>>>>>>>>>> pointers on what might be the issue are appreciated. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Sumeet >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>>>>>>>>> distributed system like GCS) instead of just printing them ? >>>>>>>>>>>>>>> (given that >>>>>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might >>>>>>>>>>>>>>> not see >>>>>>>>>>>>>>> prints in the original console I think). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Cham >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Sumeet, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST >>>>>>>>>>>>>>>> offset(which is default setting) as the start offset to read, >>>>>>>>>>>>>>>> which is 29. >>>>>>>>>>>>>>>> Do you have more than 29 records to read at that point? If the >>>>>>>>>>>>>>>> pipeline is >>>>>>>>>>>>>>>> only for testing purpose, I would recommend reading from >>>>>>>>>>>>>>>> earliest offset to >>>>>>>>>>>>>>>> see whether you get records. You can do so by constructing your >>>>>>>>>>>>>>>> ReadFromKafka like: >>>>>>>>>>>>>>>> ReadFromKafka( >>>>>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm trying out a simple example of reading data off a >>>>>>>>>>>>>>>>> Kafka topic into Apache Beam. Here's the relevant snippet: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>>>>>> _ = ( >>>>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>>>>>> 'localhost:29092'}, >>>>>>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any >>>>>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker >>>>>>>>>>>>>>>>> container, and I'm >>>>>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) >>>>>>>>>>>>>>>>> to publish and >>>>>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on >>>>>>>>>>>>>>>>> that front. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get >>>>>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the >>>>>>>>>>>>>>>>> Beam logs as I >>>>>>>>>>>>>>>>> publish data from `kafkacat`: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>>>>> nanos: 534000000 >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to >>>>>>>>>>>>>>>>> LATEST offset >>>>>>>>>>>>>>>>> of partition test-0" >>>>>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>>>>> nanos: 537000000 >>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting >>>>>>>>>>>>>>>>> offset for >>>>>>>>>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>>>>>>>>> 1:foo >>>>>>>>>>>>>>>>> 1:bar >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and I can confirm that its being received, again using >>>>>>>>>>>>>>>>> `kafkacat`: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k >>>>>>>>>>>>>>>>> Value: %s\n' >>>>>>>>>>>>>>>>> Key: 1 Value: foo >>>>>>>>>>>>>>>>> Key: 1 Value: bar >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> But despite this, I don't see the actual message being >>>>>>>>>>>>>>>>> printed by Beam as I expected. Any pointers to what's missing >>>>>>>>>>>>>>>>> here are >>>>>>>>>>>>>>>>> appreciated. I'm suspecting this could be a decoding issue on >>>>>>>>>>>>>>>>> the Beam >>>>>>>>>>>>>>>>> pipeline side, but could be incorrect. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>> Sumeet >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>
