Are you able to run a similar Java streaming pipeline using KafkaIO and
Flink ? (without x-lang)

Thanks,
Cham

On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <sumeet.malho...@gmail.com>
wrote:

> Hi Cham!
>
> So finally I was able to get partial success. Since I had pre-populated
> the Kafka topic (in_topic) with 3 records, I set max_num_records=3 to see
> if it can read all existing records, as follows:
>
>   with beam.Pipeline(options=pipeline_options) as pipeline:
>     _ = (
>         pipeline
>         | 'Read from kafka' >> ReadFromKafka(
>             consumer_config={
>                 'bootstrap.servers': bootstrap_servers,
>                 'auto.offset.reset': 'earliest'},
>             topics=[in_topic],
>             max_num_records=3)
>         | 'Write to kafka' >> WriteToKafka(
>             producer_config={
>                 'bootstrap.servers': bootstrap_servers},
>             topic=out_topic))
>
> I was able to see all 3 records being read, and written successfully to
> the out_topic as well. So, it appears that there might be some issue with
> reading unbounded Kafka streams here? Or is there any setting that I might
> be missing?
>
> Thanks,
> Sumeet
>
>
> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <sumeet.malho...@gmail.com>
> wrote:
>
>> Hey Cham!
>>
>> Appreciate the response. I tried out your suggestions (details below),
>> but I still don't see any data being consumed or written back to Kafka (as
>> per your suggestion). I'm also providing additional details/context that
>> might help narrow down the issue. Apologies for being a bit verbose from
>> hereon!
>>
>> First, here's what my pipeline code looks like now:
>>
>> ~~~~~~
>> import apache_beam as beam
>> from apache_beam.io.kafka import ReadFromKafka
>> from apache_beam.io.kafka import WriteToKafka
>> from apache_beam.options.pipeline_options import PipelineOptions
>>
>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args):
>>   pipeline_options = PipelineOptions(pipeline_args,
>> save_main_session=True, streaming=True)
>>
>>   logging.info('Starting data pipeline. bootstrap_servers=%s in_topic=%s
>> out_topic=%s',
>>       str(bootstrap_servers), in_topic, out_topic)
>>
>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>     _ = (
>>         pipeline
>>         | 'Read from kafka' >> ReadFromKafka(
>>             consumer_config={
>>                 'bootstrap.servers': bootstrap_servers,
>>                 'auto.offset.reset': 'earliest'
>>             },
>>             topics=[in_topic])
>>         | 'Write to kafka' >> WriteToKafka(
>>             producer_config={
>>                 'bootstrap.servers': bootstrap_servers
>>             },
>>             topic=out_topic))
>>
>> if __name__ == '__main__':
>>   logging.getLogger().setLevel(logging.INFO)
>>   import argparse
>>
>>   parser = argparse.ArgumentParser()
>>   parser.add_argument(
>>       '--bootstrap_servers',
>>       dest='bootstrap_servers',
>>       required=True,
>>       help='Bootstrap servers for the Kafka cluster')
>>   parser.add_argument(
>>       '--in_topic',
>>       dest='in_topic',
>>       required=True,
>>       help='Kafka topic to read data from')
>>   parser.add_argument(
>>       '--out_topic',
>>       dest='out_topic',
>>       required=True,
>>       help='Kafka topic to write data to')
>>   known_args, pipeline_args = parser.parse_known_args()
>>
>>   run(known_args.bootstrap_servers, known_args.in_topic,
>> known_args.out_topic, pipeline_args)
>> ~~~~~
>>
>> I'm firing this pipeline as follows:
>>
>> python ./pipeline.py --bootstrap_servers=localhost:29092
>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner
>>
>> I have pre-populated the Kafka topic with 3 records:
>>
>> $ kafkacat -C -b localhost:29092 -t in_topic
>> v1
>> v2
>> v3
>>
>> Now, when I execute the pipeline, I see that it starts to read records
>> from offset 0, but then seeks to the latest offset 3 without processing the
>> records. I don't see any data written to out_topic. I filtered out the logs
>> a bit, and this is what I'm seeing:
>>
>> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092
>> in_topic=in_topic out_topic=out_topic
>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned to
>> split 0 (total 1): in_topic-0'
>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>> clientId=consumer-2, groupId=null] Subscribed to partition(s): in_topic-0'
>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>> clientId=consumer-2, groupId=null] Resetting offset for partition
>> in_topic-0 to offset 0.'
>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading from
>> in_topic-0 starting at offset 0'
>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to
>> partition(s): in_topic-0'
>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST offset
>> of partition in_topic-0'
>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer
>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3,
>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for
>> partition in_topic-0 to offset 3.'
>>
>> Additionally, the logs also emit complete consumer and producer configs.
>> I'm dumping them here, in case that helps:
>>
>> Consumer Config:
>>
>> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig values:'
>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics =
>> true'
>> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms =
>> 5000'
>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = earliest'
>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>> [localhost:29092]'
>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true'
>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default'
>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack ='
>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms =
>> 540000'
>> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms =
>> 60000'
>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = false'
>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics =
>> true'
>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = 52428800'
>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500'
>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1'
>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id =
>> Reader-0_offset_consumer_1947524890_none'
>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = null'
>> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms =
>> 3000'
>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close
>> = true'
>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level =
>> read_uncommitted'
>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes =
>> 1048576'
>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms =
>> 300000'
>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>> 300000'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level =
>> INFO'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms =
>> 30000'
>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy
>> = [class org.apache.kafka.clients.consumer.RangeAssignor]'
>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = 65536'
>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms =
>> 1000'
>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50'
>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000'
>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
>> /usr/bin/kinit'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>> = 60000'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>> = 0.05'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>> = 0.8'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>> = 300'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>> = 60'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>> = 0.8'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>> = 0.05'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>> PLAINTEXT'
>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072'
>> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = 10000'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>> [TLSv1.2, TLSv1.1, TLSv1]'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>> = https'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm =
>> SunX509'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm =
>> PKIX'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location =
>> null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password =
>> null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = class
>> org.apache.kafka.common.serialization.ByteArrayDeserializer'
>>
>> Producer Config:
>>
>> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig values:'
>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1'
>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384'
>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers =
>> [localhost:29092]'
>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432'
>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default'
>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id ='
>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none'
>> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms =
>> 540000'
>> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms =
>> 120000'
>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = false'
>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []'
>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class
>> org.apache.kafka.common.serialization.ByteArraySerializer'
>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0'
>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000'
>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection
>> = 5'
>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = 1048576'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms =
>> 300000'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level =
>> INFO'
>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms =
>> 30000'
>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner'
>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = 32768'
>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms =
>> 1000'
>> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50'
>> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000'
>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3'
>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd =
>> /usr/bin/kinit'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin
>> = 60000'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter
>> = 0.05'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor
>> = 0.8'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds
>> = 300'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds
>> = 60'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor
>> = 0.8'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter
>> = 0.05'
>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI'
>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol =
>> PLAINTEXT'
>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols =
>> [TLSv1.2, TLSv1.1, TLSv1]'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm
>> = https'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm =
>> SunX509'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation
>> = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm =
>> PKIX'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location =
>> null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password =
>> null'
>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS'
>> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms =
>> 60000'
>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null'
>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class
>> org.apache.kafka.common.serialization.ByteArraySerializer'
>>
>>
>> Apologies again for dumping almost everything here :-) Any pointers on
>> what might be the issue are appreciated.
>>
>> Thanks,
>> Sumeet
>>
>>
>>
>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Also can you try sending messages back to Kafka (or another distributed
>>> system like GCS) instead of just printing them ? (given that multi-language
>>> pipelines run SDK containers in Docker you might  not see prints in the
>>> original console I think).
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <boyu...@google.com> wrote:
>>>
>>>> Hi Sumeet,
>>>>
>>>> It seems like your kafka consumer uses the LATEST offset(which is
>>>> default setting) as the start offset to read, which is 29. Do you have more
>>>> than 29 records to read at that point? If the pipeline is only for testing
>>>> purpose, I would recommend reading from earliest offset to see whether you
>>>> get records. You can do so by constructing your ReadFromKafka like:
>>>> ReadFromKafka(
>>>>             consumer_config={'bootstrap.servers': 'localhost:29092',
>>>> 'auto.offset.reset':'earliest'},
>>>>             topics=['test'])
>>>>
>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra <
>>>> sumeet.malho...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm trying out a simple example of reading data off a Kafka topic into
>>>>> Apache Beam. Here's the relevant snippet:
>>>>>
>>>>>   with beam.Pipeline(options=pipeline_options) as pipeline:
>>>>>     _ = (
>>>>>         pipeline
>>>>>         | 'Read from Kafka' >> ReadFromKafka(
>>>>>             consumer_config={'bootstrap.servers': 'localhost:29092'},
>>>>>             topics=['test'])
>>>>>         | 'Print' >> beam.Map(print))
>>>>>
>>>>> Using the above Beam pipeline snippet, I don't see any messages coming
>>>>> in. Kafka is running locally in a docker container, and I'm able to use
>>>>> `kafkacat` from the host (outside the container) to publish and subscribe
>>>>> to messages. So, I guess there are no issues on that front.
>>>>>
>>>>> It appears that Beam is able to connect to Kafka and get notified of
>>>>> new messages, as I see the offset changes in the Beam logs as I publish
>>>>> data from `kafkacat`:
>>>>>
>>>>> INFO:root:severity: INFO
>>>>> timestamp {
>>>>>   seconds: 1612886861
>>>>>   nanos: 534000000
>>>>> }
>>>>> message: "[Consumer
>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset
>>>>> of partition test-0"
>>>>> log_location:
>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>> thread: "22"
>>>>>
>>>>> INFO:root:severity: INFO
>>>>> timestamp {
>>>>>   seconds: 1612886861
>>>>>   nanos: 537000000
>>>>> }
>>>>> message: "[Consumer
>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3,
>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for
>>>>> partition test-0 to offset 29."
>>>>> log_location:
>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState"
>>>>> thread: "22"
>>>>>
>>>>> This is how I'm publishing data using `kafkacat`:
>>>>>
>>>>> $ kafkacat -P -b localhost:29092 -t test -K:
>>>>> 1:foo
>>>>> 1:bar
>>>>>
>>>>> and I can confirm that its being received, again using `kafkacat`:
>>>>>
>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n'
>>>>> Key: 1 Value: foo
>>>>> Key: 1 Value: bar
>>>>>
>>>>> But despite this, I don't see the actual message being printed by Beam
>>>>> as I expected. Any pointers to what's missing here are appreciated. I'm
>>>>> suspecting this could be a decoding issue on the Beam pipeline side, but
>>>>> could be incorrect.
>>>>>
>>>>> Thanks in advance for any pointers!
>>>>>
>>>>> Cheers,
>>>>> Sumeet
>>>>>
>>>>

Reply via email to