On Wed, Mar 17, 2021 at 10:23 AM Chamikara Jayalath <[email protected]>
wrote:

> IIUC,  currently Splittable DoFn (source framework) does not work for
> portable runners in streaming mode due to the issue Boyuan mentioned.
>
> On Tue, Mar 16, 2021 at 8:35 PM Sumeet Malhotra <[email protected]>
> wrote:
>
>> Thanks Cham. In the python version, I do specify the streaming option as
>> follows (not on the command line though):
>>
>> pipeline_options = PipelineOptions(pipeline_args, save_main_session=True,
>> streaming=True)
>>
>> Regarding running portable pipelines, just to confirm, what you are
>> saying is that currently the only way to execute this is in Java then until
>> the issue you created is resolved?
>>
>
> Yes, I think Java worked since it did not use portable Spark/Flink but a
> cross-language transform would require this.
>
> Thanks,
> Cham
>
>
>>
>> Thanks,
>> Sumeet
>>
>>
>> On Wed, Mar 17, 2021 at 5:38 AM Boyuan Zhang <[email protected]> wrote:
>>
>>> Hi Sumeet,
>>>
>>> After double checking the current support status. the root cause is that
>>> when you are using cross-language pipelines, you are actually having
>>> pipelines running in the portable way[1]. Currently we haven't supported
>>> processing unbounded source on Flink over portable execution well. I have
>>> filed https://issues.apache.org/jira/browse/BEAM-11998 to track the
>>> progress.
>>>
>>> [1] https://s.apache.org/beam-fn-api
>>>
>>>
>>> On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]>
>>> wrote:
>>>
>>>> And one more question, did you launch your pipeline with streaming=True
>>>> pipeline options? I think you need to use --streaming=True to have
>>>> unbounded source working properly.
>>>>
>>>> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Sumeet,
>>>>>
>>>>> Which Beam version are you using for your pipeline?
>>>>>
>>>>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> I don't believe Fn API DirectRunner supports streaming yet (I might
>>>>>> be wrong). I can confirm that this works for Dataflow.
>>>>>>
>>>>>
You are right about DirectRunner. And this is tracked in
https://issues.apache.org/jira/browse/BEAM-7514. (/cc +Pablo Estrada
<[email protected]>)


>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> Thanks Cham! But I don't think this is Flink specific. I have
>>>>>>> observed similar behaviour with DirectRunner as well BTW.
>>>>>>>
>>>>>>> ..Sumeet
>>>>>>>
>>>>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> I'm not too familiar with Flink but it seems like, for streaming
>>>>>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent
>>>>>>>> steps for some reason.
>>>>>>>> * X-lang Bounded read with Flink seems to be fine.
>>>>>>>> * X-lang Kafka sink and with Flink to be fine.
>>>>>>>>
>>>>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for
>>>>>>>> tracking.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Cham
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> Hi Cham,
>>>>>>>>>
>>>>>>>>> Do you have pointers on what might be going on? Or something else
>>>>>>>>> I can try? I had posted the same on StackOverflow [1], it seems that 
>>>>>>>>> I'm
>>>>>>>>> not the only one seeing this issue at the moment.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Sumeet
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Took me some time to setup the Java test (using Java after more
>>>>>>>>>> than a decade!), but yes a similar pipeline with KafkaIO and Flink 
>>>>>>>>>> seems to
>>>>>>>>>> work fine.
>>>>>>>>>>
>>>>>>>>>> Here's the relevant Java code. The only difference from the
>>>>>>>>>> Python version is that I had to extract the KV from the KafkaRecord 
>>>>>>>>>> object
>>>>>>>>>> and construct a PCollection<KV> explicitly before writing to the 
>>>>>>>>>> output
>>>>>>>>>> topic.
>>>>>>>>>>
>>>>>>>>>> ~~~~~~~~
>>>>>>>>>> package org.apache.beam.kafka.test;
>>>>>>>>>>
>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>>>>>>>>>> import org.apache.beam.sdk.options.Default;
>>>>>>>>>> import org.apache.beam.sdk.options.Description;
>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptions;
>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>> import org.apache.beam.sdk.transforms.*;
>>>>>>>>>> import org.apache.beam.sdk.values.KV;
>>>>>>>>>> import org.apache.beam.sdk.values.PCollection;
>>>>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer;
>>>>>>>>>>
>>>>>>>>>> public class KafkaTest {
>>>>>>>>>>
>>>>>>>>>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; //
>>>>>>>>>> Default bootstrap kafka servers
>>>>>>>>>>   static final String INPUT_TOPIC = "in_topic"; // Default input
>>>>>>>>>> kafka topic name
>>>>>>>>>>   static final String OUTPUT_TOPIC = "out_topic"; // Default
>>>>>>>>>> output kafka topic name
>>>>>>>>>>
>>>>>>>>>>   /** Specific pipeline options. */
>>>>>>>>>>   public interface KafkaTestOptions extends PipelineOptions {
>>>>>>>>>>     @Description("Kafka bootstrap servers")
>>>>>>>>>>     @Default.String(BOOTSTRAP_SERVERS)
>>>>>>>>>>     String getBootstrap();
>>>>>>>>>>
>>>>>>>>>>     void setBootstrap(String value);
>>>>>>>>>>
>>>>>>>>>>     @Description("Kafka input topic name")
>>>>>>>>>>     @Default.String(INPUT_TOPIC)
>>>>>>>>>>     String getInputTopic();
>>>>>>>>>>
>>>>>>>>>>     void setInputTopic(String value);
>>>>>>>>>>
>>>>>>>>>>     @Description("Kafka output topic name")
>>>>>>>>>>     @Default.String(OUTPUT_TOPIC)
>>>>>>>>>>     String getOutputTopic();
>>>>>>>>>>
>>>>>>>>>>     void setOutputTopic(String value);
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>>   public static final void main(String[] args) throws Exception {
>>>>>>>>>>     final KafkaTestOptions options =
>>>>>>>>>>
>>>>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>>>>>>>>>
>>>>>>>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>>     pipeline
>>>>>>>>>>         .apply(
>>>>>>>>>>             "ReadFromKafka",
>>>>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>>>>                 .withTopic(options.getInputTopic())
>>>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>>>>         .apply(
>>>>>>>>>>             "PrepareForWriting",
>>>>>>>>>>             ParDo.of(
>>>>>>>>>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>>>>>>>>>> String>>() {
>>>>>>>>>>                   @ProcessElement
>>>>>>>>>>                   public void processElement(ProcessContext c)
>>>>>>>>>> throws Exception {
>>>>>>>>>>                     c.output(KV.of(c.element().getKV().getKey(),
>>>>>>>>>> c.element().getKV().getValue()));
>>>>>>>>>>                   }
>>>>>>>>>>                 }))
>>>>>>>>>>         .apply(
>>>>>>>>>>             "WriteToKafka",
>>>>>>>>>>             KafkaIO.<String, String>write()
>>>>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>>>>                 .withTopic(options.getOutputTopic())
>>>>>>>>>>
>>>>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>>>>>>>>>
>>>>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>>>>>>>>>
>>>>>>>>>>     pipeline.run();
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>> ~~~~~~~~~
>>>>>>>>>>
>>>>>>>>>> I'm firing the Java version as follows:
>>>>>>>>>>
>>>>>>>>>> $ mvn exec:java
>>>>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest 
>>>>>>>>>> -Pflink-runner
>>>>>>>>>> -Dexec.args="--runner=FlinkRunner"
>>>>>>>>>>
>>>>>>>>>> And I can see in real time, that as I publish records to the
>>>>>>>>>> in_topic, the out_topic is able to receive them on a continuous 
>>>>>>>>>> basis.
>>>>>>>>>>
>>>>>>>>>> I hope this helps narrow down the issue.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Sumeet
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Are you able to run a similar Java streaming pipeline using
>>>>>>>>>>> KafkaIO and Flink ? (without x-lang)
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Cham
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Cham!
>>>>>>>>>>>>
>>>>>>>>>>>> So finally I was able to get partial success. Since I had
>>>>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set
>>>>>>>>>>>> max_num_records=3 to see if it can read all existing records, as 
>>>>>>>>>>>> follows:
>>>>>>>>>>>>
>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>>>>             consumer_config={
>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>>>>                 'auto.offset.reset': 'earliest'},
>>>>>>>>>>>>             topics=[in_topic],
>>>>>>>>>>>>             max_num_records=3)
>>>>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>>>>             producer_config={
>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>>>>>>>>>             topic=out_topic))
>>>>>>>>>>>>
>>>>>>>>>>>> I was able to see all 3 records being read, and written
>>>>>>>>>>>> successfully to the out_topic as well. So, it appears that there 
>>>>>>>>>>>> might be
>>>>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there 
>>>>>>>>>>>> any
>>>>>>>>>>>> setting that I might be missing?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hey Cham!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Appreciate the response. I tried out your suggestions (details
>>>>>>>>>>>>> below), but I still don't see any data being consumed or written 
>>>>>>>>>>>>> back to
>>>>>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional
>>>>>>>>>>>>> details/context that might help narrow down the issue. Apologies 
>>>>>>>>>>>>> for being
>>>>>>>>>>>>> a bit verbose from hereon!
>>>>>>>>>>>>>
>>>>>>>>>>>>> First, here's what my pipeline code looks like now:
>>>>>>>>>>>>>
>>>>>>>>>>>>> ~~~~~~
>>>>>>>>>>>>> import apache_beam as beam
>>>>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>>>>>>>>>> from apache_beam.options.pipeline_options import
>>>>>>>>>>>>> PipelineOptions
>>>>>>>>>>>>>
>>>>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>>>>>>>>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>>>>>>>>>> save_main_session=True, streaming=True)
>>>>>>>>>>>>>
>>>>>>>>>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>>>>>>>>>> in_topic=%s out_topic=%s',
>>>>>>>>>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>>>>>>>>>
>>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>>>>>             consumer_config={
>>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>>>>>                 'auto.offset.reset': 'earliest'
>>>>>>>>>>>>>             },
>>>>>>>>>>>>>             topics=[in_topic])
>>>>>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>>>>>             producer_config={
>>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>>>>>>>>>             },
>>>>>>>>>>>>>             topic=out_topic))
>>>>>>>>>>>>>
>>>>>>>>>>>>> if __name__ == '__main__':
>>>>>>>>>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>>>>>>>>>   import argparse
>>>>>>>>>>>>>
>>>>>>>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>>       '--bootstrap_servers',
>>>>>>>>>>>>>       dest='bootstrap_servers',
>>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>>       '--in_topic',
>>>>>>>>>>>>>       dest='in_topic',
>>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>>       help='Kafka topic to read data from')
>>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>>       '--out_topic',
>>>>>>>>>>>>>       dest='out_topic',
>>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>>       help='Kafka topic to write data to')
>>>>>>>>>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>>>>>>>>>
>>>>>>>>>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>>>>>>>>>> known_args.out_topic, pipeline_args)
>>>>>>>>>>>>> ~~~~~
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm firing this pipeline as follows:
>>>>>>>>>>>>>
>>>>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>>>>>>>>>
>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>>>>>>>>>> v1
>>>>>>>>>>>>> v2
>>>>>>>>>>>>> v3
>>>>>>>>>>>>>
>>>>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read
>>>>>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 
>>>>>>>>>>>>> without
>>>>>>>>>>>>> processing the records. I don't see any data written to 
>>>>>>>>>>>>> out_topic. I
>>>>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing:
>>>>>>>>>>>>>
>>>>>>>>>>>>> INFO:root:Starting data pipeline.
>>>>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic 
>>>>>>>>>>>>> out_topic=out_topic
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions
>>>>>>>>>>>>> assigned to split 0 (total 1): in_topic-0'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): 
>>>>>>>>>>>>> in_topic-0'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>>>>>>>>>> in_topic-0 to offset 0.'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0:
>>>>>>>>>>>>> reading from in_topic-0 starting at offset 0'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>>>>>>>>>> partition(s): in_topic-0'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to 
>>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>>> of partition in_topic-0'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting 
>>>>>>>>>>>>> offset for
>>>>>>>>>>>>> partition in_topic-0 to offset 3.'
>>>>>>>>>>>>>
>>>>>>>>>>>>> Additionally, the logs also emit complete consumer and
>>>>>>>>>>>>> producer configs. I'm dumping them here, in case that helps:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Consumer Config:
>>>>>>>>>>>>>
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO:
>>>>>>>>>>>>> ConsumerConfig values:'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>>>>>>>>>> = true'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tauto.commit.interval.ms = 5000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset
>>>>>>>>>>>>> = earliest'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers
>>>>>>>>>>>>> = [localhost:29092]'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup
>>>>>>>>>>>>> = default'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tdefault.api.timeout.ms = 60000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit
>>>>>>>>>>>>> = false'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics
>>>>>>>>>>>>> = true'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes =
>>>>>>>>>>>>> 52428800'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms
>>>>>>>>>>>>> = 500'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes =
>>>>>>>>>>>>> 1'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>>>>>>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> theartbeat.interval.ms = 3000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>>>>> = []'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>>>>>>>>>> = true'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>>>>>>>>>>>>> read_uncommitted'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer
>>>>>>>>>>>>> = class 
>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>>>>>>>>>> = 1048576'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tmax.poll.interval.ms = 300000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records
>>>>>>>>>>>>> = 500'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tmetadata.max.age.ms = 300000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters
>>>>>>>>>>>>> = []'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>>>>> = 2'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>>>>> = INFO'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>>>>> = 65536'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> treconnect.backoff.ms = 50'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> trequest.timeout.ms = 30000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms
>>>>>>>>>>>>> = 100'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>>>>> = 60000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>>>>> = 300'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>>>>> = 60'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>>>>> GSSAPI'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol
>>>>>>>>>>>>> = PLAINTEXT'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes
>>>>>>>>>>>>> = 131072'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tsession.timeout.ms = 10000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>>>>> = https'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>>>>> = SunX509'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type
>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol =
>>>>>>>>>>>>> TLS'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider =
>>>>>>>>>>>>> null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>>>>> = PKIX'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer
>>>>>>>>>>>>> = class 
>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>>>>>
>>>>>>>>>>>>> Producer Config:
>>>>>>>>>>>>>
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO:
>>>>>>>>>>>>> ProducerConfig values:'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size =
>>>>>>>>>>>>> 16384'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers
>>>>>>>>>>>>> = [localhost:29092]'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory =
>>>>>>>>>>>>> 33554432'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup
>>>>>>>>>>>>> = default'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type
>>>>>>>>>>>>> = none'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tdelivery.timeout.ms = 120000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence
>>>>>>>>>>>>> = false'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>>>>> = []'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer =
>>>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms =
>>>>>>>>>>>>> 60000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>>>>>>>>>> = 5'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size
>>>>>>>>>>>>> = 1048576'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tmetadata.max.age.ms = 300000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters
>>>>>>>>>>>>> = []'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>>>>> = 2'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>>>>> = INFO'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class
>>>>>>>>>>>>> = class 
>>>>>>>>>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>>>>> = 32768'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> treconnect.backoff.ms = 50'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> trequest.timeout.ms = 30000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms
>>>>>>>>>>>>> = 100'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>>>>> = 60000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>>>>> = 300'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>>>>> = 60'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>>>>> GSSAPI'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol
>>>>>>>>>>>>> = PLAINTEXT'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes
>>>>>>>>>>>>> = 131072'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>>>>> = https'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>>>>> = SunX509'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type
>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol =
>>>>>>>>>>>>> TLS'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider =
>>>>>>>>>>>>> null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>>>>> = PKIX'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>> ttransaction.timeout.ms = 60000'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id
>>>>>>>>>>>>> = null'
>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer
>>>>>>>>>>>>> = class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Apologies again for dumping almost everything here :-) Any
>>>>>>>>>>>>> pointers on what might be the issue are appreciated.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>>>>>>>>>> distributed system like GCS) instead of just printing them ? 
>>>>>>>>>>>>>> (given that
>>>>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might  
>>>>>>>>>>>>>> not see
>>>>>>>>>>>>>> prints in the  original console I think).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Sumeet,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST
>>>>>>>>>>>>>>> offset(which is default setting) as the start offset to read, 
>>>>>>>>>>>>>>> which is 29.
>>>>>>>>>>>>>>> Do you have more than 29 records to read at that point? If the 
>>>>>>>>>>>>>>> pipeline is
>>>>>>>>>>>>>>> only for testing purpose, I would recommend reading from 
>>>>>>>>>>>>>>> earliest offset to
>>>>>>>>>>>>>>> see whether you get records. You can do so by constructing your
>>>>>>>>>>>>>>> ReadFromKafka like:
>>>>>>>>>>>>>>> ReadFromKafka(
>>>>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'},
>>>>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka
>>>>>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>>>>>> 'localhost:29092'},
>>>>>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any
>>>>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker 
>>>>>>>>>>>>>>>> container, and I'm
>>>>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) 
>>>>>>>>>>>>>>>> to publish and
>>>>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that 
>>>>>>>>>>>>>>>> front.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get
>>>>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the 
>>>>>>>>>>>>>>>> Beam logs as I
>>>>>>>>>>>>>>>> publish data from `kafkacat`:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>>>>   nanos: 534000000
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to 
>>>>>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>>>>>> of partition test-0"
>>>>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>>>>   nanos: 537000000
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting 
>>>>>>>>>>>>>>>> offset for
>>>>>>>>>>>>>>>> partition test-0 to offset 29."
>>>>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>>>>>>>>>> 1:foo
>>>>>>>>>>>>>>>> 1:bar
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> and I can confirm that its being received, again using
>>>>>>>>>>>>>>>> `kafkacat`:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value:
>>>>>>>>>>>>>>>> %s\n'
>>>>>>>>>>>>>>>> Key: 1 Value: foo
>>>>>>>>>>>>>>>> Key: 1 Value: bar
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> But despite this, I don't see the actual message being
>>>>>>>>>>>>>>>> printed by Beam as I expected. Any pointers to what's missing 
>>>>>>>>>>>>>>>> here are
>>>>>>>>>>>>>>>> appreciated. I'm suspecting this could be a decoding issue on 
>>>>>>>>>>>>>>>> the Beam
>>>>>>>>>>>>>>>> pipeline side, but could be incorrect.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to