On Wed, Mar 17, 2021 at 10:23 AM Chamikara Jayalath <[email protected]> wrote:
> IIUC, currently Splittable DoFn (source framework) does not work for > portable runners in streaming mode due to the issue Boyuan mentioned. > > On Tue, Mar 16, 2021 at 8:35 PM Sumeet Malhotra <[email protected]> > wrote: > >> Thanks Cham. In the python version, I do specify the streaming option as >> follows (not on the command line though): >> >> pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, >> streaming=True) >> >> Regarding running portable pipelines, just to confirm, what you are >> saying is that currently the only way to execute this is in Java then until >> the issue you created is resolved? >> > > Yes, I think Java worked since it did not use portable Spark/Flink but a > cross-language transform would require this. > > Thanks, > Cham > > >> >> Thanks, >> Sumeet >> >> >> On Wed, Mar 17, 2021 at 5:38 AM Boyuan Zhang <[email protected]> wrote: >> >>> Hi Sumeet, >>> >>> After double checking the current support status. the root cause is that >>> when you are using cross-language pipelines, you are actually having >>> pipelines running in the portable way[1]. Currently we haven't supported >>> processing unbounded source on Flink over portable execution well. I have >>> filed https://issues.apache.org/jira/browse/BEAM-11998 to track the >>> progress. >>> >>> [1] https://s.apache.org/beam-fn-api >>> >>> >>> On Tue, Mar 16, 2021 at 10:13 AM Boyuan Zhang <[email protected]> >>> wrote: >>> >>>> And one more question, did you launch your pipeline with streaming=True >>>> pipeline options? I think you need to use --streaming=True to have >>>> unbounded source working properly. >>>> >>>> On Tue, Mar 16, 2021 at 9:41 AM Boyuan Zhang <[email protected]> >>>> wrote: >>>> >>>>> Hi Sumeet, >>>>> >>>>> Which Beam version are you using for your pipeline? >>>>> >>>>> On Mon, Mar 15, 2021 at 11:41 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> I don't believe Fn API DirectRunner supports streaming yet (I might >>>>>> be wrong). I can confirm that this works for Dataflow. >>>>>> >>>>> You are right about DirectRunner. And this is tracked in https://issues.apache.org/jira/browse/BEAM-7514. (/cc +Pablo Estrada <[email protected]>) > >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Mon, Mar 15, 2021 at 11:37 PM Sumeet Malhotra < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Thanks Cham! But I don't think this is Flink specific. I have >>>>>>> observed similar behaviour with DirectRunner as well BTW. >>>>>>> >>>>>>> ..Sumeet >>>>>>> >>>>>>> On Tue, Mar 16, 2021 at 12:00 PM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I'm not too familiar with Flink but it seems like, for streaming >>>>>>>> pipelines, messages from Kafka/SDF read do not get pushed to subsequent >>>>>>>> steps for some reason. >>>>>>>> * X-lang Bounded read with Flink seems to be fine. >>>>>>>> * X-lang Kafka sink and with Flink to be fine. >>>>>>>> >>>>>>>> Created https://issues.apache.org/jira/browse/BEAM-11991 for >>>>>>>> tracking. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cham >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 15, 2021 at 8:33 PM Sumeet Malhotra < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Hi Cham, >>>>>>>>> >>>>>>>>> Do you have pointers on what might be going on? Or something else >>>>>>>>> I can try? I had posted the same on StackOverflow [1], it seems that >>>>>>>>> I'm >>>>>>>>> not the only one seeing this issue at the moment. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Sumeet >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://stackoverflow.com/questions/66151919/apache-beam-python-sdk-readfromkafka-does-not-receive-data >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Mar 12, 2021 at 11:41 AM Sumeet Malhotra < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Took me some time to setup the Java test (using Java after more >>>>>>>>>> than a decade!), but yes a similar pipeline with KafkaIO and Flink >>>>>>>>>> seems to >>>>>>>>>> work fine. >>>>>>>>>> >>>>>>>>>> Here's the relevant Java code. The only difference from the >>>>>>>>>> Python version is that I had to extract the KV from the KafkaRecord >>>>>>>>>> object >>>>>>>>>> and construct a PCollection<KV> explicitly before writing to the >>>>>>>>>> output >>>>>>>>>> topic. >>>>>>>>>> >>>>>>>>>> ~~~~~~~~ >>>>>>>>>> package org.apache.beam.kafka.test; >>>>>>>>>> >>>>>>>>>> import org.apache.beam.sdk.Pipeline; >>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaIO; >>>>>>>>>> import org.apache.beam.sdk.io.kafka.KafkaRecord; >>>>>>>>>> import org.apache.beam.sdk.options.Default; >>>>>>>>>> import org.apache.beam.sdk.options.Description; >>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>>>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>>>>>> import org.apache.beam.sdk.transforms.*; >>>>>>>>>> import org.apache.beam.sdk.values.KV; >>>>>>>>>> import org.apache.beam.sdk.values.PCollection; >>>>>>>>>> import org.apache.kafka.common.serialization.StringDeserializer; >>>>>>>>>> >>>>>>>>>> public class KafkaTest { >>>>>>>>>> >>>>>>>>>> static final String BOOTSTRAP_SERVERS = "localhost:29092"; // >>>>>>>>>> Default bootstrap kafka servers >>>>>>>>>> static final String INPUT_TOPIC = "in_topic"; // Default input >>>>>>>>>> kafka topic name >>>>>>>>>> static final String OUTPUT_TOPIC = "out_topic"; // Default >>>>>>>>>> output kafka topic name >>>>>>>>>> >>>>>>>>>> /** Specific pipeline options. */ >>>>>>>>>> public interface KafkaTestOptions extends PipelineOptions { >>>>>>>>>> @Description("Kafka bootstrap servers") >>>>>>>>>> @Default.String(BOOTSTRAP_SERVERS) >>>>>>>>>> String getBootstrap(); >>>>>>>>>> >>>>>>>>>> void setBootstrap(String value); >>>>>>>>>> >>>>>>>>>> @Description("Kafka input topic name") >>>>>>>>>> @Default.String(INPUT_TOPIC) >>>>>>>>>> String getInputTopic(); >>>>>>>>>> >>>>>>>>>> void setInputTopic(String value); >>>>>>>>>> >>>>>>>>>> @Description("Kafka output topic name") >>>>>>>>>> @Default.String(OUTPUT_TOPIC) >>>>>>>>>> String getOutputTopic(); >>>>>>>>>> >>>>>>>>>> void setOutputTopic(String value); >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> public static final void main(String[] args) throws Exception { >>>>>>>>>> final KafkaTestOptions options = >>>>>>>>>> >>>>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(KafkaTestOptions.class); >>>>>>>>>> >>>>>>>>>> Pipeline pipeline = Pipeline.create(options); >>>>>>>>>> pipeline >>>>>>>>>> .apply( >>>>>>>>>> "ReadFromKafka", >>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>>>> .withTopic(options.getInputTopic()) >>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>> .withValueDeserializer(StringDeserializer.class)) >>>>>>>>>> .apply( >>>>>>>>>> "PrepareForWriting", >>>>>>>>>> ParDo.of( >>>>>>>>>> new DoFn<KafkaRecord<String, String>, KV<String, >>>>>>>>>> String>>() { >>>>>>>>>> @ProcessElement >>>>>>>>>> public void processElement(ProcessContext c) >>>>>>>>>> throws Exception { >>>>>>>>>> c.output(KV.of(c.element().getKV().getKey(), >>>>>>>>>> c.element().getKV().getValue())); >>>>>>>>>> } >>>>>>>>>> })) >>>>>>>>>> .apply( >>>>>>>>>> "WriteToKafka", >>>>>>>>>> KafkaIO.<String, String>write() >>>>>>>>>> .withBootstrapServers(options.getBootstrap()) >>>>>>>>>> .withTopic(options.getOutputTopic()) >>>>>>>>>> >>>>>>>>>> .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class) >>>>>>>>>> >>>>>>>>>> .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)); >>>>>>>>>> >>>>>>>>>> pipeline.run(); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> ~~~~~~~~~ >>>>>>>>>> >>>>>>>>>> I'm firing the Java version as follows: >>>>>>>>>> >>>>>>>>>> $ mvn exec:java >>>>>>>>>> -Dexec.mainClass=org.apache.beam.tutorial.analytic.KafkaTest >>>>>>>>>> -Pflink-runner >>>>>>>>>> -Dexec.args="--runner=FlinkRunner" >>>>>>>>>> >>>>>>>>>> And I can see in real time, that as I publish records to the >>>>>>>>>> in_topic, the out_topic is able to receive them on a continuous >>>>>>>>>> basis. >>>>>>>>>> >>>>>>>>>> I hope this helps narrow down the issue. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Sumeet >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Mar 11, 2021 at 11:27 AM Chamikara Jayalath < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Are you able to run a similar Java streaming pipeline using >>>>>>>>>>> KafkaIO and Flink ? (without x-lang) >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Cham >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Cham! >>>>>>>>>>>> >>>>>>>>>>>> So finally I was able to get partial success. Since I had >>>>>>>>>>>> pre-populated the Kafka topic (in_topic) with 3 records, I set >>>>>>>>>>>> max_num_records=3 to see if it can read all existing records, as >>>>>>>>>>>> follows: >>>>>>>>>>>> >>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>> _ = ( >>>>>>>>>>>> pipeline >>>>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>>>> consumer_config={ >>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>>>> 'auto.offset.reset': 'earliest'}, >>>>>>>>>>>> topics=[in_topic], >>>>>>>>>>>> max_num_records=3) >>>>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>>>> producer_config={ >>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers}, >>>>>>>>>>>> topic=out_topic)) >>>>>>>>>>>> >>>>>>>>>>>> I was able to see all 3 records being read, and written >>>>>>>>>>>> successfully to the out_topic as well. So, it appears that there >>>>>>>>>>>> might be >>>>>>>>>>>> some issue with reading unbounded Kafka streams here? Or is there >>>>>>>>>>>> any >>>>>>>>>>>> setting that I might be missing? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Sumeet >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hey Cham! >>>>>>>>>>>>> >>>>>>>>>>>>> Appreciate the response. I tried out your suggestions (details >>>>>>>>>>>>> below), but I still don't see any data being consumed or written >>>>>>>>>>>>> back to >>>>>>>>>>>>> Kafka (as per your suggestion). I'm also providing additional >>>>>>>>>>>>> details/context that might help narrow down the issue. Apologies >>>>>>>>>>>>> for being >>>>>>>>>>>>> a bit verbose from hereon! >>>>>>>>>>>>> >>>>>>>>>>>>> First, here's what my pipeline code looks like now: >>>>>>>>>>>>> >>>>>>>>>>>>> ~~~~~~ >>>>>>>>>>>>> import apache_beam as beam >>>>>>>>>>>>> from apache_beam.io.kafka import ReadFromKafka >>>>>>>>>>>>> from apache_beam.io.kafka import WriteToKafka >>>>>>>>>>>>> from apache_beam.options.pipeline_options import >>>>>>>>>>>>> PipelineOptions >>>>>>>>>>>>> >>>>>>>>>>>>> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >>>>>>>>>>>>> pipeline_options = PipelineOptions(pipeline_args, >>>>>>>>>>>>> save_main_session=True, streaming=True) >>>>>>>>>>>>> >>>>>>>>>>>>> logging.info('Starting data pipeline. bootstrap_servers=%s >>>>>>>>>>>>> in_topic=%s out_topic=%s', >>>>>>>>>>>>> str(bootstrap_servers), in_topic, out_topic) >>>>>>>>>>>>> >>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>> _ = ( >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> | 'Read from kafka' >> ReadFromKafka( >>>>>>>>>>>>> consumer_config={ >>>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers, >>>>>>>>>>>>> 'auto.offset.reset': 'earliest' >>>>>>>>>>>>> }, >>>>>>>>>>>>> topics=[in_topic]) >>>>>>>>>>>>> | 'Write to kafka' >> WriteToKafka( >>>>>>>>>>>>> producer_config={ >>>>>>>>>>>>> 'bootstrap.servers': bootstrap_servers >>>>>>>>>>>>> }, >>>>>>>>>>>>> topic=out_topic)) >>>>>>>>>>>>> >>>>>>>>>>>>> if __name__ == '__main__': >>>>>>>>>>>>> logging.getLogger().setLevel(logging.INFO) >>>>>>>>>>>>> import argparse >>>>>>>>>>>>> >>>>>>>>>>>>> parser = argparse.ArgumentParser() >>>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>>> '--bootstrap_servers', >>>>>>>>>>>>> dest='bootstrap_servers', >>>>>>>>>>>>> required=True, >>>>>>>>>>>>> help='Bootstrap servers for the Kafka cluster') >>>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>>> '--in_topic', >>>>>>>>>>>>> dest='in_topic', >>>>>>>>>>>>> required=True, >>>>>>>>>>>>> help='Kafka topic to read data from') >>>>>>>>>>>>> parser.add_argument( >>>>>>>>>>>>> '--out_topic', >>>>>>>>>>>>> dest='out_topic', >>>>>>>>>>>>> required=True, >>>>>>>>>>>>> help='Kafka topic to write data to') >>>>>>>>>>>>> known_args, pipeline_args = parser.parse_known_args() >>>>>>>>>>>>> >>>>>>>>>>>>> run(known_args.bootstrap_servers, known_args.in_topic, >>>>>>>>>>>>> known_args.out_topic, pipeline_args) >>>>>>>>>>>>> ~~~~~ >>>>>>>>>>>>> >>>>>>>>>>>>> I'm firing this pipeline as follows: >>>>>>>>>>>>> >>>>>>>>>>>>> python ./pipeline.py --bootstrap_servers=localhost:29092 >>>>>>>>>>>>> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >>>>>>>>>>>>> >>>>>>>>>>>>> I have pre-populated the Kafka topic with 3 records: >>>>>>>>>>>>> >>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t in_topic >>>>>>>>>>>>> v1 >>>>>>>>>>>>> v2 >>>>>>>>>>>>> v3 >>>>>>>>>>>>> >>>>>>>>>>>>> Now, when I execute the pipeline, I see that it starts to read >>>>>>>>>>>>> records from offset 0, but then seeks to the latest offset 3 >>>>>>>>>>>>> without >>>>>>>>>>>>> processing the records. I don't see any data written to >>>>>>>>>>>>> out_topic. I >>>>>>>>>>>>> filtered out the logs a bit, and this is what I'm seeing: >>>>>>>>>>>>> >>>>>>>>>>>>> INFO:root:Starting data pipeline. >>>>>>>>>>>>> bootstrap_servers=localhost:29092 in_topic=in_topic >>>>>>>>>>>>> out_topic=out_topic >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions >>>>>>>>>>>>> assigned to split 0 (total 1): in_topic-0' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>> clientId=consumer-2, groupId=null] Subscribed to partition(s): >>>>>>>>>>>>> in_topic-0' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>> clientId=consumer-2, groupId=null] Resetting offset for partition >>>>>>>>>>>>> in_topic-0 to offset 0.' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: >>>>>>>>>>>>> reading from in_topic-0 starting at offset 0' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >>>>>>>>>>>>> partition(s): in_topic-0' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to >>>>>>>>>>>>> LATEST offset >>>>>>>>>>>>> of partition in_topic-0' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1947524890_none] Resetting >>>>>>>>>>>>> offset for >>>>>>>>>>>>> partition in_topic-0 to offset 3.' >>>>>>>>>>>>> >>>>>>>>>>>>> Additionally, the logs also emit complete consumer and >>>>>>>>>>>>> producer configs. I'm dumping them here, in case that helps: >>>>>>>>>>>>> >>>>>>>>>>>>> Consumer Config: >>>>>>>>>>>>> >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: >>>>>>>>>>>>> ConsumerConfig values:' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics >>>>>>>>>>>>> = true' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tauto.commit.interval.ms = 5000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset >>>>>>>>>>>>> = earliest' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers >>>>>>>>>>>>> = [localhost:29092]' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup >>>>>>>>>>>>> = default' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tdefault.api.timeout.ms = 60000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit >>>>>>>>>>>>> = false' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics >>>>>>>>>>>>> = true' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = >>>>>>>>>>>>> 52428800' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms >>>>>>>>>>>>> = 500' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = >>>>>>>>>>>>> 1' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >>>>>>>>>>>>> Reader-0_offset_consumer_1947524890_none' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> theartbeat.interval.ms = 3000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>>>> = []' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >>>>>>>>>>>>> = true' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >>>>>>>>>>>>> read_uncommitted' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer >>>>>>>>>>>>> = class >>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes >>>>>>>>>>>>> = 1048576' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tmax.poll.interval.ms = 300000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records >>>>>>>>>>>>> = 500' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tmetadata.max.age.ms = 300000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters >>>>>>>>>>>>> = []' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>>>> = 2' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>>>> = INFO' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >>>>>>>>>>>>> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>>>> = 65536' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> trequest.timeout.ms = 30000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms >>>>>>>>>>>>> = 100' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>>>> = 60000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>>>> = 300' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>>>> = 60' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>>>> GSSAPI' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol >>>>>>>>>>>>> = PLAINTEXT' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes >>>>>>>>>>>>> = 131072' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tsession.timeout.ms = 10000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>>>> = https' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>>>> = SunX509' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type >>>>>>>>>>>>> = JKS' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = >>>>>>>>>>>>> TLS' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = >>>>>>>>>>>>> null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>>>> = PKIX' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>>>> = JKS' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer >>>>>>>>>>>>> = class >>>>>>>>>>>>> org.apache.kafka.common.serialization.ByteArrayDeserializer' >>>>>>>>>>>>> >>>>>>>>>>>>> Producer Config: >>>>>>>>>>>>> >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'INFO: >>>>>>>>>>>>> ProducerConfig values:' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = >>>>>>>>>>>>> 16384' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers >>>>>>>>>>>>> = [localhost:29092]' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = >>>>>>>>>>>>> 33554432' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup >>>>>>>>>>>>> = default' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type >>>>>>>>>>>>> = none' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tconnections.max.idle.ms = 540000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tdelivery.timeout.ms = 120000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence >>>>>>>>>>>>> = false' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes >>>>>>>>>>>>> = []' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = >>>>>>>>>>>>> class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = >>>>>>>>>>>>> 60000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >>>>>>>>>>>>> = 5' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size >>>>>>>>>>>>> = 1048576' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tmetadata.max.age.ms = 300000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters >>>>>>>>>>>>> = []' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples >>>>>>>>>>>>> = 2' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level >>>>>>>>>>>>> = INFO' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tmetrics.sample.window.ms = 30000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class >>>>>>>>>>>>> = class >>>>>>>>>>>>> org.apache.kafka.clients.producer.internals.DefaultPartitioner' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes >>>>>>>>>>>>> = 32768' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> treconnect.backoff.max.ms = 1000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> treconnect.backoff.ms = 50' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> trequest.timeout.ms = 30000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms >>>>>>>>>>>>> = 100' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd >>>>>>>>>>>>> = /usr/bin/kinit' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >>>>>>>>>>>>> = 60000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> tsasl.kerberos.service.name = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >>>>>>>>>>>>> = 300' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >>>>>>>>>>>>> = 60' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >>>>>>>>>>>>> = 0.8' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >>>>>>>>>>>>> = 0.05' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = >>>>>>>>>>>>> GSSAPI' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol >>>>>>>>>>>>> = PLAINTEXT' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes >>>>>>>>>>>>> = 131072' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols >>>>>>>>>>>>> = [TLSv1.2, TLSv1.1, TLSv1]' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >>>>>>>>>>>>> = https' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm >>>>>>>>>>>>> = SunX509' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type >>>>>>>>>>>>> = JKS' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = >>>>>>>>>>>>> TLS' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = >>>>>>>>>>>>> null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm >>>>>>>>>>>>> = PKIX' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type >>>>>>>>>>>>> = JKS' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ >>>>>>>>>>>>> ttransaction.timeout.ms = 60000' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id >>>>>>>>>>>>> = null' >>>>>>>>>>>>> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer >>>>>>>>>>>>> = class org.apache.kafka.common.serialization.ByteArraySerializer' >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Apologies again for dumping almost everything here :-) Any >>>>>>>>>>>>> pointers on what might be the issue are appreciated. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Sumeet >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Also can you try sending messages back to Kafka (or another >>>>>>>>>>>>>> distributed system like GCS) instead of just printing them ? >>>>>>>>>>>>>> (given that >>>>>>>>>>>>>> multi-language pipelines run SDK containers in Docker you might >>>>>>>>>>>>>> not see >>>>>>>>>>>>>> prints in the original console I think). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Cham >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Sumeet, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It seems like your kafka consumer uses the LATEST >>>>>>>>>>>>>>> offset(which is default setting) as the start offset to read, >>>>>>>>>>>>>>> which is 29. >>>>>>>>>>>>>>> Do you have more than 29 records to read at that point? If the >>>>>>>>>>>>>>> pipeline is >>>>>>>>>>>>>>> only for testing purpose, I would recommend reading from >>>>>>>>>>>>>>> earliest offset to >>>>>>>>>>>>>>> see whether you get records. You can do so by constructing your >>>>>>>>>>>>>>> ReadFromKafka like: >>>>>>>>>>>>>>> ReadFromKafka( >>>>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>>>> 'localhost:29092', 'auto.offset.reset':'earliest'}, >>>>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi All, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm trying out a simple example of reading data off a Kafka >>>>>>>>>>>>>>>> topic into Apache Beam. Here's the relevant snippet: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>>>>>>>>>>>>> _ = ( >>>>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>>>>>>>>>>>>> consumer_config={'bootstrap.servers': >>>>>>>>>>>>>>>> 'localhost:29092'}, >>>>>>>>>>>>>>>> topics=['test']) >>>>>>>>>>>>>>>> | 'Print' >> beam.Map(print)) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Using the above Beam pipeline snippet, I don't see any >>>>>>>>>>>>>>>> messages coming in. Kafka is running locally in a docker >>>>>>>>>>>>>>>> container, and I'm >>>>>>>>>>>>>>>> able to use `kafkacat` from the host (outside the container) >>>>>>>>>>>>>>>> to publish and >>>>>>>>>>>>>>>> subscribe to messages. So, I guess there are no issues on that >>>>>>>>>>>>>>>> front. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It appears that Beam is able to connect to Kafka and get >>>>>>>>>>>>>>>> notified of new messages, as I see the offset changes in the >>>>>>>>>>>>>>>> Beam logs as I >>>>>>>>>>>>>>>> publish data from `kafkacat`: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>>>> nanos: 534000000 >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to >>>>>>>>>>>>>>>> LATEST offset >>>>>>>>>>>>>>>> of partition test-0" >>>>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> INFO:root:severity: INFO >>>>>>>>>>>>>>>> timestamp { >>>>>>>>>>>>>>>> seconds: 1612886861 >>>>>>>>>>>>>>>> nanos: 537000000 >>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>> message: "[Consumer >>>>>>>>>>>>>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>>>>>>>>>>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting >>>>>>>>>>>>>>>> offset for >>>>>>>>>>>>>>>> partition test-0 to offset 29." >>>>>>>>>>>>>>>> log_location: >>>>>>>>>>>>>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>>>>>>>>>>>>> thread: "22" >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This is how I'm publishing data using `kafkacat`: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>>>>>>>>>>>>> 1:foo >>>>>>>>>>>>>>>> 1:bar >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> and I can confirm that its being received, again using >>>>>>>>>>>>>>>> `kafkacat`: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: >>>>>>>>>>>>>>>> %s\n' >>>>>>>>>>>>>>>> Key: 1 Value: foo >>>>>>>>>>>>>>>> Key: 1 Value: bar >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> But despite this, I don't see the actual message being >>>>>>>>>>>>>>>> printed by Beam as I expected. Any pointers to what's missing >>>>>>>>>>>>>>>> here are >>>>>>>>>>>>>>>> appreciated. I'm suspecting this could be a decoding issue on >>>>>>>>>>>>>>>> the Beam >>>>>>>>>>>>>>>> pipeline side, but could be incorrect. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks in advance for any pointers! >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Sumeet >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>
