Are you able to run a similar Java streaming pipeline using KafkaIO and Flink ? (without x-lang)
Thanks, Cham On Tue, Mar 9, 2021 at 11:03 PM Sumeet Malhotra <sumeet.malho...@gmail.com> wrote: > Hi Cham! > > So finally I was able to get partial success. Since I had pre-populated > the Kafka topic (in_topic) with 3 records, I set max_num_records=3 to see > if it can read all existing records, as follows: > > with beam.Pipeline(options=pipeline_options) as pipeline: > _ = ( > pipeline > | 'Read from kafka' >> ReadFromKafka( > consumer_config={ > 'bootstrap.servers': bootstrap_servers, > 'auto.offset.reset': 'earliest'}, > topics=[in_topic], > max_num_records=3) > | 'Write to kafka' >> WriteToKafka( > producer_config={ > 'bootstrap.servers': bootstrap_servers}, > topic=out_topic)) > > I was able to see all 3 records being read, and written successfully to > the out_topic as well. So, it appears that there might be some issue with > reading unbounded Kafka streams here? Or is there any setting that I might > be missing? > > Thanks, > Sumeet > > > On Wed, Mar 10, 2021 at 9:43 AM Sumeet Malhotra <sumeet.malho...@gmail.com> > wrote: > >> Hey Cham! >> >> Appreciate the response. I tried out your suggestions (details below), >> but I still don't see any data being consumed or written back to Kafka (as >> per your suggestion). I'm also providing additional details/context that >> might help narrow down the issue. Apologies for being a bit verbose from >> hereon! >> >> First, here's what my pipeline code looks like now: >> >> ~~~~~~ >> import apache_beam as beam >> from apache_beam.io.kafka import ReadFromKafka >> from apache_beam.io.kafka import WriteToKafka >> from apache_beam.options.pipeline_options import PipelineOptions >> >> def run(bootstrap_servers, in_topic, out_topic, pipeline_args): >> pipeline_options = PipelineOptions(pipeline_args, >> save_main_session=True, streaming=True) >> >> logging.info('Starting data pipeline. bootstrap_servers=%s in_topic=%s >> out_topic=%s', >> str(bootstrap_servers), in_topic, out_topic) >> >> with beam.Pipeline(options=pipeline_options) as pipeline: >> _ = ( >> pipeline >> | 'Read from kafka' >> ReadFromKafka( >> consumer_config={ >> 'bootstrap.servers': bootstrap_servers, >> 'auto.offset.reset': 'earliest' >> }, >> topics=[in_topic]) >> | 'Write to kafka' >> WriteToKafka( >> producer_config={ >> 'bootstrap.servers': bootstrap_servers >> }, >> topic=out_topic)) >> >> if __name__ == '__main__': >> logging.getLogger().setLevel(logging.INFO) >> import argparse >> >> parser = argparse.ArgumentParser() >> parser.add_argument( >> '--bootstrap_servers', >> dest='bootstrap_servers', >> required=True, >> help='Bootstrap servers for the Kafka cluster') >> parser.add_argument( >> '--in_topic', >> dest='in_topic', >> required=True, >> help='Kafka topic to read data from') >> parser.add_argument( >> '--out_topic', >> dest='out_topic', >> required=True, >> help='Kafka topic to write data to') >> known_args, pipeline_args = parser.parse_known_args() >> >> run(known_args.bootstrap_servers, known_args.in_topic, >> known_args.out_topic, pipeline_args) >> ~~~~~ >> >> I'm firing this pipeline as follows: >> >> python ./pipeline.py --bootstrap_servers=localhost:29092 >> --in_topic=in_topic --out_topic=out_topic --runner=FlinkRunner >> >> I have pre-populated the Kafka topic with 3 records: >> >> $ kafkacat -C -b localhost:29092 -t in_topic >> v1 >> v2 >> v3 >> >> Now, when I execute the pipeline, I see that it starts to read records >> from offset 0, but then seeks to the latest offset 3 without processing the >> records. I don't see any data written to out_topic. I filtered out the logs >> a bit, and this is what I'm seeing: >> >> INFO:root:Starting data pipeline. bootstrap_servers=localhost:29092 >> in_topic=in_topic out_topic=out_topic >> INFO:apache_beam.utils.subprocess_server:b'INFO: Partitions assigned to >> split 0 (total 1): in_topic-0' >> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >> clientId=consumer-2, groupId=null] Subscribed to partition(s): in_topic-0' >> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >> clientId=consumer-2, groupId=null] Resetting offset for partition >> in_topic-0 to offset 0.' >> INFO:apache_beam.utils.subprocess_server:b'INFO: Reader-0: reading from >> in_topic-0 starting at offset 0' >> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >> groupId=Reader-0_offset_consumer_1947524890_none] Subscribed to >> partition(s): in_topic-0' >> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >> groupId=Reader-0_offset_consumer_1947524890_none] Seeking to LATEST offset >> of partition in_topic-0' >> INFO:apache_beam.utils.subprocess_server:b'INFO: [Consumer >> clientId=consumer-Reader-0_offset_consumer_1947524890_none-3, >> groupId=Reader-0_offset_consumer_1947524890_none] Resetting offset for >> partition in_topic-0 to offset 3.' >> >> Additionally, the logs also emit complete consumer and producer configs. >> I'm dumping them here, in case that helps: >> >> Consumer Config: >> >> INFO:apache_beam.utils.subprocess_server:b'INFO: ConsumerConfig values:' >> INFO:apache_beam.utils.subprocess_server:b'\tallow.auto.create.topics = >> true' >> INFO:apache_beam.utils.subprocess_server:b'\tauto.commit.interval.ms = >> 5000' >> INFO:apache_beam.utils.subprocess_server:b'\tauto.offset.reset = earliest' >> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >> [localhost:29092]' >> INFO:apache_beam.utils.subprocess_server:b'\tcheck.crcs = true' >> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default' >> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >> INFO:apache_beam.utils.subprocess_server:b'\tclient.rack =' >> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms = >> 540000' >> INFO:apache_beam.utils.subprocess_server:b'\tdefault.api.timeout.ms = >> 60000' >> INFO:apache_beam.utils.subprocess_server:b'\tenable.auto.commit = false' >> INFO:apache_beam.utils.subprocess_server:b'\texclude.internal.topics = >> true' >> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.bytes = 52428800' >> INFO:apache_beam.utils.subprocess_server:b'\tfetch.max.wait.ms = 500' >> INFO:apache_beam.utils.subprocess_server:b'\tfetch.min.bytes = 1' >> INFO:apache_beam.utils.subprocess_server:b'\tgroup.id = >> Reader-0_offset_consumer_1947524890_none' >> INFO:apache_beam.utils.subprocess_server:b'\tgroup.instance.id = null' >> INFO:apache_beam.utils.subprocess_server:b'\theartbeat.interval.ms = >> 3000' >> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >> INFO:apache_beam.utils.subprocess_server:b'\tinternal.leave.group.on.close >> = true' >> INFO:apache_beam.utils.subprocess_server:b'\tisolation.level = >> read_uncommitted' >> INFO:apache_beam.utils.subprocess_server:b'\tkey.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer' >> INFO:apache_beam.utils.subprocess_server:b'\tmax.partition.fetch.bytes = >> 1048576' >> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.interval.ms = >> 300000' >> INFO:apache_beam.utils.subprocess_server:b'\tmax.poll.records = 500' >> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >> 300000' >> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = >> INFO' >> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms = >> 30000' >> INFO:apache_beam.utils.subprocess_server:b'\tpartition.assignment.strategy >> = [class org.apache.kafka.clients.consumer.RangeAssignor]' >> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = 65536' >> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms = >> 1000' >> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50' >> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000' >> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd = >> /usr/bin/kinit' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >> = 60000' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >> = 0.05' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >> = 0.8' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >> = 300' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >> = 60' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >> = 0.8' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >> = 0.05' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >> PLAINTEXT' >> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072' >> INFO:apache_beam.utils.subprocess_server:b'\tsession.timeout.ms = 10000' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >> [TLSv1.2, TLSv1.1, TLSv1]' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >> = https' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm = >> SunX509' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm = >> PKIX' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = >> null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = >> null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS' >> INFO:apache_beam.utils.subprocess_server:b'\tvalue.deserializer = class >> org.apache.kafka.common.serialization.ByteArrayDeserializer' >> >> Producer Config: >> >> INFO:apache_beam.utils.subprocess_server:b'INFO: ProducerConfig values:' >> INFO:apache_beam.utils.subprocess_server:b'\tacks = 1' >> INFO:apache_beam.utils.subprocess_server:b'\tbatch.size = 16384' >> INFO:apache_beam.utils.subprocess_server:b'\tbootstrap.servers = >> [localhost:29092]' >> INFO:apache_beam.utils.subprocess_server:b'\tbuffer.memory = 33554432' >> INFO:apache_beam.utils.subprocess_server:b'\tclient.dns.lookup = default' >> INFO:apache_beam.utils.subprocess_server:b'\tclient.id =' >> INFO:apache_beam.utils.subprocess_server:b'\tcompression.type = none' >> INFO:apache_beam.utils.subprocess_server:b'\tconnections.max.idle.ms = >> 540000' >> INFO:apache_beam.utils.subprocess_server:b'\tdelivery.timeout.ms = >> 120000' >> INFO:apache_beam.utils.subprocess_server:b'\tenable.idempotence = false' >> INFO:apache_beam.utils.subprocess_server:b'\tinterceptor.classes = []' >> INFO:apache_beam.utils.subprocess_server:b'\tkey.serializer = class >> org.apache.kafka.common.serialization.ByteArraySerializer' >> INFO:apache_beam.utils.subprocess_server:b'\tlinger.ms = 0' >> INFO:apache_beam.utils.subprocess_server:b'\tmax.block.ms = 60000' >> INFO:apache_beam.utils.subprocess_server:b'\tmax.in.flight.requests.per.connection >> = 5' >> INFO:apache_beam.utils.subprocess_server:b'\tmax.request.size = 1048576' >> INFO:apache_beam.utils.subprocess_server:b'\tmetadata.max.age.ms = >> 300000' >> INFO:apache_beam.utils.subprocess_server:b'\tmetric.reporters = []' >> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.num.samples = 2' >> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.recording.level = >> INFO' >> INFO:apache_beam.utils.subprocess_server:b'\tmetrics.sample.window.ms = >> 30000' >> INFO:apache_beam.utils.subprocess_server:b'\tpartitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner' >> INFO:apache_beam.utils.subprocess_server:b'\treceive.buffer.bytes = 32768' >> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.max.ms = >> 1000' >> INFO:apache_beam.utils.subprocess_server:b'\treconnect.backoff.ms = 50' >> INFO:apache_beam.utils.subprocess_server:b'\trequest.timeout.ms = 30000' >> INFO:apache_beam.utils.subprocess_server:b'\tretries = 3' >> INFO:apache_beam.utils.subprocess_server:b'\tretry.backoff.ms = 100' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.client.callback.handler.class >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.jaas.config = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.kinit.cmd = >> /usr/bin/kinit' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.min.time.before.relogin >> = 60000' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.service.name >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.jitter >> = 0.05' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.kerberos.ticket.renew.window.factor >> = 0.8' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.callback.handler.class >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.class = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.buffer.seconds >> = 300' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.min.period.seconds >> = 60' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.factor >> = 0.8' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.login.refresh.window.jitter >> = 0.05' >> INFO:apache_beam.utils.subprocess_server:b'\tsasl.mechanism = GSSAPI' >> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.protocol = >> PLAINTEXT' >> INFO:apache_beam.utils.subprocess_server:b'\tsecurity.providers = null' >> INFO:apache_beam.utils.subprocess_server:b'\tsend.buffer.bytes = 131072' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.cipher.suites = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.enabled.protocols = >> [TLSv1.2, TLSv1.1, TLSv1]' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.endpoint.identification.algorithm >> = https' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.key.password = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keymanager.algorithm = >> SunX509' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.location = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.password = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.keystore.type = JKS' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.protocol = TLS' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.provider = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.secure.random.implementation >> = null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.trustmanager.algorithm = >> PKIX' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.location = >> null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.password = >> null' >> INFO:apache_beam.utils.subprocess_server:b'\tssl.truststore.type = JKS' >> INFO:apache_beam.utils.subprocess_server:b'\ttransaction.timeout.ms = >> 60000' >> INFO:apache_beam.utils.subprocess_server:b'\ttransactional.id = null' >> INFO:apache_beam.utils.subprocess_server:b'\tvalue.serializer = class >> org.apache.kafka.common.serialization.ByteArraySerializer' >> >> >> Apologies again for dumping almost everything here :-) Any pointers on >> what might be the issue are appreciated. >> >> Thanks, >> Sumeet >> >> >> >> On Wed, Mar 10, 2021 at 12:32 AM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Also can you try sending messages back to Kafka (or another distributed >>> system like GCS) instead of just printing them ? (given that multi-language >>> pipelines run SDK containers in Docker you might not see prints in the >>> original console I think). >>> >>> Thanks, >>> Cham >>> >>> On Tue, Mar 9, 2021 at 10:26 AM Boyuan Zhang <boyu...@google.com> wrote: >>> >>>> Hi Sumeet, >>>> >>>> It seems like your kafka consumer uses the LATEST offset(which is >>>> default setting) as the start offset to read, which is 29. Do you have more >>>> than 29 records to read at that point? If the pipeline is only for testing >>>> purpose, I would recommend reading from earliest offset to see whether you >>>> get records. You can do so by constructing your ReadFromKafka like: >>>> ReadFromKafka( >>>> consumer_config={'bootstrap.servers': 'localhost:29092', >>>> 'auto.offset.reset':'earliest'}, >>>> topics=['test']) >>>> >>>> On Tue, Mar 9, 2021 at 12:25 AM Sumeet Malhotra < >>>> sumeet.malho...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I'm trying out a simple example of reading data off a Kafka topic into >>>>> Apache Beam. Here's the relevant snippet: >>>>> >>>>> with beam.Pipeline(options=pipeline_options) as pipeline: >>>>> _ = ( >>>>> pipeline >>>>> | 'Read from Kafka' >> ReadFromKafka( >>>>> consumer_config={'bootstrap.servers': 'localhost:29092'}, >>>>> topics=['test']) >>>>> | 'Print' >> beam.Map(print)) >>>>> >>>>> Using the above Beam pipeline snippet, I don't see any messages coming >>>>> in. Kafka is running locally in a docker container, and I'm able to use >>>>> `kafkacat` from the host (outside the container) to publish and subscribe >>>>> to messages. So, I guess there are no issues on that front. >>>>> >>>>> It appears that Beam is able to connect to Kafka and get notified of >>>>> new messages, as I see the offset changes in the Beam logs as I publish >>>>> data from `kafkacat`: >>>>> >>>>> INFO:root:severity: INFO >>>>> timestamp { >>>>> seconds: 1612886861 >>>>> nanos: 534000000 >>>>> } >>>>> message: "[Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>> groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset >>>>> of partition test-0" >>>>> log_location: >>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>> thread: "22" >>>>> >>>>> INFO:root:severity: INFO >>>>> timestamp { >>>>> seconds: 1612886861 >>>>> nanos: 537000000 >>>>> } >>>>> message: "[Consumer >>>>> clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, >>>>> groupId=Reader-0_offset_consumer_1692125327_none] Resetting offset for >>>>> partition test-0 to offset 29." >>>>> log_location: >>>>> "org.apache.kafka.clients.consumer.internals.SubscriptionState" >>>>> thread: "22" >>>>> >>>>> This is how I'm publishing data using `kafkacat`: >>>>> >>>>> $ kafkacat -P -b localhost:29092 -t test -K: >>>>> 1:foo >>>>> 1:bar >>>>> >>>>> and I can confirm that its being received, again using `kafkacat`: >>>>> >>>>> $ kafkacat -C -b localhost:29092 -t test -f 'Key: %k Value: %s\n' >>>>> Key: 1 Value: foo >>>>> Key: 1 Value: bar >>>>> >>>>> But despite this, I don't see the actual message being printed by Beam >>>>> as I expected. Any pointers to what's missing here are appreciated. I'm >>>>> suspecting this could be a decoding issue on the Beam pipeline side, but >>>>> could be incorrect. >>>>> >>>>> Thanks in advance for any pointers! >>>>> >>>>> Cheers, >>>>> Sumeet >>>>> >>>>