Hi Sumeet,

If your data volume is small and you are happy to read them in batch mode,
one workaround of using ReadFromKafka is to specify max_num_records or
max_read_time:
https://github.com/apache/beam/blob/8b759d1eb0dd749b5c228bc1366a1d795f2dda6e/sdks/python/apache_beam/io/kafka.py#L133-L134
.

On Wed, Mar 17, 2021 at 6:40 PM Ahmet Altay <[email protected]> wrote:

>
>
> On Wed, Mar 17, 2021 at 10:23 AM Chamikara Jayalath <[email protected]>
> wrote:
>
>> IIUC,  currently Splittable DoFn (source framework) does not work for
>> portable runners in streaming mode due to the issue Boyuan mentioned.
>>
>> On Tue, Mar 16, 2021 at 8:35 PM Sumeet Malhotra <
>> [email protected]> wrote:
>>
>>> Thanks Cham. In the python version, I do specify the streaming option as
>>> follows (not on the command line though):
>>>
>>> pipeline_options = PipelineOptions(pipeline_args,
>>> save_main_session=True, streaming=True)
>>>
>>> Regarding running portable pipelines, just to confirm, what you are
>>> saying is that currently the only way to execute this is in Java then until
>>> the issue you created is resolved?
>>>
>>
>> Yes, I think Java worked since it did not use portable Spark/Flink but a
>> cross-language transform would require this.
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> Thanks,
>>> Sumeet
>>>
>>>
>>> On Wed, Mar 17, 2021 at 5:38 AM Boyuan Zhang <[email protected]> wrote:
>>>
>>>> Hi Sumeet,
>>>>
>>>> After double checking the current support status. the root cause is
>>>> that when you are using cross-language pipelines, you are actually having
>>>> pipelines running in the portable way[1]. Currently we haven't supported
>>>> processing unbounded source on Flink over portable execution well. I have
>>>> filed https://issues.apache.org/jira/browse/BEAM-11998 to track the
>>>> progress.
>>>>
>>>> [1] https://s.apache.org/beam-fn-api
>>>>
>>>>
>>>> On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]>
>>>> wrote:
>>>>
>>>>> And one more question, did you launch your pipeline with streaming=True
>>>>> pipeline options? I think you need to use --streaming=True to have
>>>>> unbounded source working properly.
>>>>>
>>>>> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Sumeet,
>>>>>>
>>>>>> Which Beam version are you using for your pipeline?
>>>>>>
>>>>>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> I don't believe Fn API DirectRunner supports streaming yet (I might
>>>>>>> be wrong). I can confirm that this works for Dataflow.
>>>>>>>
>>>>>>
> You are right about DirectRunner. And this is tracked in
> https://issues.apache.org/jira/browse/BEAM-7514. (/cc +Pablo Estrada
> <[email protected]>)
>
>
>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Thanks Cham! But I don't think this is Flink specific. I have
>>>>>>>> observed similar behaviour with DirectRunner as well BTW.
>>>>>>>>
>>>>>>>> ..Sumeet
>>>>>>>>
>>>>>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>> I'm not too familiar with Flink but it seems like, for streaming
>>>>>>>>> pipelines, messages from Kafka/SDF read do not get pushed to 
>>>>>>>>> subsequent
>>>>>>>>> steps for some reason.
>>>>>>>>> * X-lang Bounded read with Flink seems to be fine.
>>>>>>>>> * X-lang Kafka sink and with Flink to be fine.
>>>>>>>>>
>>>>>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for
>>>>>>>>> tracking.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Cham
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Cham,
>>>>>>>>>>
>>>>>>>>>> Do you have pointers on what might be going on? Or something else
>>>>>>>>>> I can try? I had posted the same on StackOverflow [1], it seems that 
>>>>>>>>>> I'm
>>>>>>>>>> not the only one seeing this issue at the moment.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Sumeet
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Took me some time to setup the Java test (using Java after more
>>>>>>>>>>> than a decade!), but yes a similar pipeline with KafkaIO and Flink 
>>>>>>>>>>> seems to
>>>>>>>>>>> work fine.
>>>>>>>>>>>
>>>>>>>>>>> Here's the relevant Java code. The only difference from the
>>>>>>>>>>> Python version is that I had to extract the KV from the KafkaRecord 
>>>>>>>>>>> object
>>>>>>>>>>> and construct a PCollection<KV> explicitly before writing to the 
>>>>>>>>>>> output
>>>>>>>>>>> topic.
>>>>>>>>>>>
>>>>>>>>>>> ~~~~~~~~
>>>>>>>>>>> package org.apache.beam.kafka.test;
>>>>>>>>>>>
>>>>>>>>>>> import org.apache.beam.sdk.Pipeline;
>>>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO;
>>>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord;
>>>>>>>>>>> import org.apache.beam.sdk.options.Default;
>>>>>>>>>>> import org.apache.beam.sdk.options.Description;
>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptions;
>>>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>>>>>>>> import org.apache.beam.sdk.transforms.*;
>>>>>>>>>>> import org.apache.beam.sdk.values.KV;
>>>>>>>>>>> import org.apache.beam.sdk.values.PCollection;
>>>>>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer;
>>>>>>>>>>>
>>>>>>>>>>> public class KafkaTest {
>>>>>>>>>>>
>>>>>>>>>>>   static final String BOOTSTRAP_SERVERS = "localhost:29092"; //
>>>>>>>>>>> Default bootstrap kafka servers
>>>>>>>>>>>   static final String INPUT_TOPIC = "in_topic"; // Default input
>>>>>>>>>>> kafka topic name
>>>>>>>>>>>   static final String OUTPUT_TOPIC = "out_topic"; // Default
>>>>>>>>>>> output kafka topic name
>>>>>>>>>>>
>>>>>>>>>>>   /** Specific pipeline options. */
>>>>>>>>>>>   public interface KafkaTestOptions extends PipelineOptions {
>>>>>>>>>>>     @Description("Kafka bootstrap servers")
>>>>>>>>>>>     @Default.String(BOOTSTRAP_SERVERS)
>>>>>>>>>>>     String getBootstrap();
>>>>>>>>>>>
>>>>>>>>>>>     void setBootstrap(String value);
>>>>>>>>>>>
>>>>>>>>>>>     @Description("Kafka input topic name")
>>>>>>>>>>>     @Default.String(INPUT_TOPIC)
>>>>>>>>>>>     String getInputTopic();
>>>>>>>>>>>
>>>>>>>>>>>     void setInputTopic(String value);
>>>>>>>>>>>
>>>>>>>>>>>     @Description("Kafka output topic name")
>>>>>>>>>>>     @Default.String(OUTPUT_TOPIC)
>>>>>>>>>>>     String getOutputTopic();
>>>>>>>>>>>
>>>>>>>>>>>     void setOutputTopic(String value);
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>>   public static final void main(String[] args) throws Exception {
>>>>>>>>>>>     final KafkaTestOptions options =
>>>>>>>>>>>
>>>>>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class);
>>>>>>>>>>>
>>>>>>>>>>>     Pipeline pipeline = Pipeline.create(options);
>>>>>>>>>>>     pipeline
>>>>>>>>>>>         .apply(
>>>>>>>>>>>             "ReadFromKafka",
>>>>>>>>>>>             KafkaIO.<String, String>read()
>>>>>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>>>>>                 .withTopic(options.getInputTopic())
>>>>>>>>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>>>>                 .withValueDeserializer(StringDeserializer.class))
>>>>>>>>>>>         .apply(
>>>>>>>>>>>             "PrepareForWriting",
>>>>>>>>>>>             ParDo.of(
>>>>>>>>>>>                 new DoFn<KafkaRecord<String, String>, KV<String,
>>>>>>>>>>> String>>() {
>>>>>>>>>>>                   @ProcessElement
>>>>>>>>>>>                   public void processElement(ProcessContext c)
>>>>>>>>>>> throws Exception {
>>>>>>>>>>>                     c.output(KV.of(c.element().getKV().getKey(),
>>>>>>>>>>> c.element().getKV().getValue()));
>>>>>>>>>>>                   }
>>>>>>>>>>>                 }))
>>>>>>>>>>>         .apply(
>>>>>>>>>>>             "WriteToKafka",
>>>>>>>>>>>             KafkaIO.<String, String>write()
>>>>>>>>>>>                 .withBootstrapServers(options.getBootstrap())
>>>>>>>>>>>                 .withTopic(options.getOutputTopic())
>>>>>>>>>>>
>>>>>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
>>>>>>>>>>>
>>>>>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class));
>>>>>>>>>>>
>>>>>>>>>>>     pipeline.run();
>>>>>>>>>>>   }
>>>>>>>>>>> }
>>>>>>>>>>> ~~~~~~~~~
>>>>>>>>>>>
>>>>>>>>>>> I'm firing the Java version as follows:
>>>>>>>>>>>
>>>>>>>>>>> $ mvn exec:java
>>>>>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest 
>>>>>>>>>>> -Pflink-runner
>>>>>>>>>>> -Dexec.args="--runner=FlinkRunner"
>>>>>>>>>>>
>>>>>>>>>>> And I can see in real time, that as I publish records to the
>>>>>>>>>>> in_topic, the out_topic is able to receive them on a continuous 
>>>>>>>>>>> basis.
>>>>>>>>>>>
>>>>>>>>>>> I hope this helps narrow down the issue.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Sumeet
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you able to run a similar Java streaming pipeline using
>>>>>>>>>>>> KafkaIO and Flink ? (without x-lang)
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Cham
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Cham!
>>>>>>>>>>>>>
>>>>>>>>>>>>> So finally I was able to get partial success. Since I had
>>>>>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set
>>>>>>>>>>>>> max_num_records=3 to see if it can read all existing records, as 
>>>>>>>>>>>>> follows:
>>>>>>>>>>>>>
>>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>>>>>             consumer_config={
>>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>>>>>                 'auto.offset.reset': 'earliest'},
>>>>>>>>>>>>>             topics=[in_topic],
>>>>>>>>>>>>>             max_num_records=3)
>>>>>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>>>>>             producer_config={
>>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers},
>>>>>>>>>>>>>             topic=out_topic))
>>>>>>>>>>>>>
>>>>>>>>>>>>> I was able to see all 3 records being read, and written
>>>>>>>>>>>>> successfully to the out_topic as well. So, it appears that there 
>>>>>>>>>>>>> might be
>>>>>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there 
>>>>>>>>>>>>> any
>>>>>>>>>>>>> setting that I might be missing?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey Cham!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Appreciate the response. I tried out your suggestions
>>>>>>>>>>>>>> (details below), but I still don't see any data being consumed 
>>>>>>>>>>>>>> or written
>>>>>>>>>>>>>> back to Kafka (as per your suggestion). I'm also providing 
>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>> details/context that might help narrow down the issue. Apologies 
>>>>>>>>>>>>>> for being
>>>>>>>>>>>>>> a bit verbose from hereon!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> First, here's what my pipeline code looks like now:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ~~~~~~
>>>>>>>>>>>>>> import apache_beam as beam
>>>>>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka
>>>>>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka
>>>>>>>>>>>>>> from apache_beam.options.pipeline_options import
>>>>>>>>>>>>>> PipelineOptions
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic,
>>>>>>>>>>>>>> pipeline_args):
>>>>>>>>>>>>>>   pipeline_options = PipelineOptions(pipeline_args,
>>>>>>>>>>>>>> save_main_session=True, streaming=True)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   logging.info('Starting data pipeline. bootstrap_servers=%s
>>>>>>>>>>>>>> in_topic=%s out_topic=%s',
>>>>>>>>>>>>>>       str(bootstrap_servers), in_topic, out_topic)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>>         | 'Read from kafka' >> ReadFromKafka(
>>>>>>>>>>>>>>             consumer_config={
>>>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers,
>>>>>>>>>>>>>>                 'auto.offset.reset': 'earliest'
>>>>>>>>>>>>>>             },
>>>>>>>>>>>>>>             topics=[in_topic])
>>>>>>>>>>>>>>         | 'Write to kafka' >> WriteToKafka(
>>>>>>>>>>>>>>             producer_config={
>>>>>>>>>>>>>>                 'bootstrap.servers': bootstrap_servers
>>>>>>>>>>>>>>             },
>>>>>>>>>>>>>>             topic=out_topic))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> if __name__ == '__main__':
>>>>>>>>>>>>>>   logging.getLogger().setLevel(logging.INFO)
>>>>>>>>>>>>>>   import argparse
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   parser = argparse.ArgumentParser()
>>>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>>>       '--bootstrap_servers',
>>>>>>>>>>>>>>       dest='bootstrap_servers',
>>>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>>>       help='Bootstrap servers for the Kafka cluster')
>>>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>>>       '--in_topic',
>>>>>>>>>>>>>>       dest='in_topic',
>>>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>>>       help='Kafka topic to read data from')
>>>>>>>>>>>>>>   parser.add_argument(
>>>>>>>>>>>>>>       '--out_topic',
>>>>>>>>>>>>>>       dest='out_topic',
>>>>>>>>>>>>>>       required=True,
>>>>>>>>>>>>>>       help='Kafka topic to write data to')
>>>>>>>>>>>>>>   known_args, pipeline_args = parser.parse_known_args()
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>   run(known_args.bootstrap_servers, known_args.in_topic,
>>>>>>>>>>>>>> known_args.out_topic, pipeline_args)
>>>>>>>>>>>>>> ~~~~~
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm firing this pipeline as follows:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092
>>>>>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have pre-populated the Kafka topic with 3 records:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic
>>>>>>>>>>>>>> v1
>>>>>>>>>>>>>> v2
>>>>>>>>>>>>>> v3
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to
>>>>>>>>>>>>>> read records from offset 0, but then seeks to the latest offset 
>>>>>>>>>>>>>> 3 without
>>>>>>>>>>>>>> processing the records. I don't see any data written to 
>>>>>>>>>>>>>> out_topic. I
>>>>>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> INFO:root:Starting data pipeline.
>>>>>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic 
>>>>>>>>>>>>>> out_topic=out_topic
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions
>>>>>>>>>>>>>> assigned to split 0 (total 1): in_topic-0'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): 
>>>>>>>>>>>>>> in_topic-0'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition
>>>>>>>>>>>>>> in_topic-0 to offset 0.'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0:
>>>>>>>>>>>>>> reading from in_topic-0 starting at offset 0'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>>>>>>>>>>>>>> partition(s): in_topic-0'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to 
>>>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>>>> of partition in_topic-0'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting 
>>>>>>>>>>>>>> offset for
>>>>>>>>>>>>>> partition in_topic-0 to offset 3.'
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Additionally, the logs also emit complete consumer and
>>>>>>>>>>>>>> producer configs. I'm dumping them here, in case that helps:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Consumer Config:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO:
>>>>>>>>>>>>>> ConsumerConfig values:'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics
>>>>>>>>>>>>>> = true'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tauto.commit.interval.ms = 5000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset
>>>>>>>>>>>>>> = earliest'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers
>>>>>>>>>>>>>> = [localhost:29092]'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs =
>>>>>>>>>>>>>> true'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup
>>>>>>>>>>>>>> = default'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tdefault.api.timeout.ms = 60000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit
>>>>>>>>>>>>>> = false'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics
>>>>>>>>>>>>>> = true'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes
>>>>>>>>>>>>>> = 52428800'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tfetch.max.wait.ms = 500'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes
>>>>>>>>>>>>>> = 1'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>>>>>>>>>>>>>> Reader-0_offset_consumer_1947524890_none'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tgroup.instance.id = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> theartbeat.interval.ms = 3000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>>>>>> = []'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>>>>>>>>>>>>>> = true'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level
>>>>>>>>>>>>>> = read_uncommitted'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer
>>>>>>>>>>>>>> = class 
>>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes
>>>>>>>>>>>>>> = 1048576'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tmax.poll.interval.ms = 300000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records
>>>>>>>>>>>>>> = 500'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tmetadata.max.age.ms = 300000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters
>>>>>>>>>>>>>> = []'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>>>>>> = 2'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>>>>>> = INFO'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>>>>>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>>>>>> = 65536'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> treconnect.backoff.ms = 50'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> trequest.timeout.ms = 30000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms
>>>>>>>>>>>>>> = 100'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>>>>>> = 60000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>>>>>> = 300'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>>>>>> = 60'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>>>>>> GSSAPI'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol
>>>>>>>>>>>>>> = PLAINTEXT'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes
>>>>>>>>>>>>>> = 131072'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tsession.timeout.ms = 10000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>>>>>> = https'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>>>>>> = SunX509'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type
>>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol =
>>>>>>>>>>>>>> TLS'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider =
>>>>>>>>>>>>>> null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>>>>>> = PKIX'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer
>>>>>>>>>>>>>> = class 
>>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Producer Config:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO:
>>>>>>>>>>>>>> ProducerConfig values:'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size =
>>>>>>>>>>>>>> 16384'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers
>>>>>>>>>>>>>> = [localhost:29092]'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory =
>>>>>>>>>>>>>> 33554432'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup
>>>>>>>>>>>>>> = default'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type
>>>>>>>>>>>>>> = none'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tconnections.max.idle.ms = 540000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tdelivery.timeout.ms = 120000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence
>>>>>>>>>>>>>> = false'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes
>>>>>>>>>>>>>> = []'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer =
>>>>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms =
>>>>>>>>>>>>>> 60000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>>>>>>>>>>>>>> = 5'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size
>>>>>>>>>>>>>> = 1048576'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tmetadata.max.age.ms = 300000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters
>>>>>>>>>>>>>> = []'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples
>>>>>>>>>>>>>> = 2'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level
>>>>>>>>>>>>>> = INFO'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tmetrics.sample.window.ms = 30000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class
>>>>>>>>>>>>>> = class 
>>>>>>>>>>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes
>>>>>>>>>>>>>> = 32768'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> treconnect.backoff.max.ms = 1000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> treconnect.backoff.ms = 50'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> trequest.timeout.ms = 30000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms
>>>>>>>>>>>>>> = 100'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd
>>>>>>>>>>>>>> = /usr/bin/kinit'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>>>>>>>>>>>>>> = 60000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> tsasl.kerberos.service.name = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>>>>>>>>>>>>>> = 300'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>>>>>>>>>>>>>> = 60'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>>>>>>>>>>>>>> = 0.8'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>>>>>>>>>>>>>> = 0.05'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism =
>>>>>>>>>>>>>> GSSAPI'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol
>>>>>>>>>>>>>> = PLAINTEXT'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes
>>>>>>>>>>>>>> = 131072'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols
>>>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>>>>>>>>>>>>>> = https'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm
>>>>>>>>>>>>>> = SunX509'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type
>>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol =
>>>>>>>>>>>>>> TLS'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider =
>>>>>>>>>>>>>> null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm
>>>>>>>>>>>>>> = PKIX'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type
>>>>>>>>>>>>>> = JKS'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\
>>>>>>>>>>>>>> ttransaction.timeout.ms = 60000'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id
>>>>>>>>>>>>>> = null'
>>>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer
>>>>>>>>>>>>>> = class 
>>>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Apologies again for dumping almost everything here :-) Any
>>>>>>>>>>>>>> pointers on what might be the issue are appreciated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Also can you try sending messages back to Kafka (or another
>>>>>>>>>>>>>>> distributed system like GCS) instead of just printing them ? 
>>>>>>>>>>>>>>> (given that
>>>>>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might 
>>>>>>>>>>>>>>>  not see
>>>>>>>>>>>>>>> prints in the  original console I think).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Cham
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <
>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Sumeet,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST
>>>>>>>>>>>>>>>> offset(which is default setting) as the start offset to read, 
>>>>>>>>>>>>>>>> which is 29.
>>>>>>>>>>>>>>>> Do you have more than 29 records to read at that point? If the 
>>>>>>>>>>>>>>>> pipeline is
>>>>>>>>>>>>>>>> only for testing purpose, I would recommend reading from 
>>>>>>>>>>>>>>>> earliest offset to
>>>>>>>>>>>>>>>> see whether you get records. You can do so by constructing your
>>>>>>>>>>>>>>>> ReadFromKafka like:
>>>>>>>>>>>>>>>> ReadFromKafka(
>>>>>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'},
>>>>>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm trying out a simple example of reading data off a
>>>>>>>>>>>>>>>>> Kafka topic into Apache Beam. Here's the relevant snippet:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>>>>>>>>>>>>>     _ = (
>>>>>>>>>>>>>>>>>         pipeline
>>>>>>>>>>>>>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>>>>>>>>>>>>>             consumer_config={'bootstrap.servers':
>>>>>>>>>>>>>>>>> 'localhost:29092'},
>>>>>>>>>>>>>>>>>             topics=['test'])
>>>>>>>>>>>>>>>>>         | 'Print' >> beam.Map(print))
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any
>>>>>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker 
>>>>>>>>>>>>>>>>> container, and I'm
>>>>>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) 
>>>>>>>>>>>>>>>>> to publish and
>>>>>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on 
>>>>>>>>>>>>>>>>> that front.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get
>>>>>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the 
>>>>>>>>>>>>>>>>> Beam logs as I
>>>>>>>>>>>>>>>>> publish data from `kafkacat`:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>>>>>   nanos: 534000000
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to 
>>>>>>>>>>>>>>>>> LATEST offset
>>>>>>>>>>>>>>>>> of partition test-0"
>>>>>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> INFO:root:severity: INFO
>>>>>>>>>>>>>>>>> timestamp {
>>>>>>>>>>>>>>>>>   seconds: 1612886861
>>>>>>>>>>>>>>>>>   nanos: 537000000
>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>> message: "[Consumer
>>>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting 
>>>>>>>>>>>>>>>>> offset for
>>>>>>>>>>>>>>>>> partition test-0 to offset 29."
>>>>>>>>>>>>>>>>> log_location:
>>>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>>>>>>>>>>>>>> thread: "22"
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>>>>>>>>>>>>>> 1:foo
>>>>>>>>>>>>>>>>> 1:bar
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> and I can confirm that its being received, again using
>>>>>>>>>>>>>>>>> `kafkacat`:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k
>>>>>>>>>>>>>>>>> Value: %s\n'
>>>>>>>>>>>>>>>>> Key: 1 Value: foo
>>>>>>>>>>>>>>>>> Key: 1 Value: bar
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> But despite this, I don't see the actual message being
>>>>>>>>>>>>>>>>> printed by Beam as I expected. Any pointers to what's missing 
>>>>>>>>>>>>>>>>> here are
>>>>>>>>>>>>>>>>> appreciated. I'm suspecting this could be a decoding issue on 
>>>>>>>>>>>>>>>>> the Beam
>>>>>>>>>>>>>>>>> pipeline side, but could be incorrect.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks in advance for any pointers!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Sumeet
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>

Reply via email to