Please help..
On Fri, 4 Jun 2021, 19:58 Shankar Mane, <[email protected]> wrote:
> Team,
>
> I am new to Beam. And running a simple python pipeline on flink as runner.
> Pipelines get Successfully Finished But i am not getting expected output at
> the end.
>
> #------------------------------------------------------------------------------------------------------------
>
> Here is Pipeline code :
>
> #!/usr/bin/python3
>>
>> import argparse
>> import logging
>> import re
>> import typing
>>
>> import apache_beam as beam
>> from apache_beam.io import ReadFromText
>> from apache_beam.io import WriteToText
>> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>>
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.options.pipeline_options import SetupOptions
>> from decimal import Decimal
>>
>>
>> class AmoountExtractingDoFn(beam.DoFn):
>> def process(self, element):
>> # Returns an iterator over the words of this element.
>> try:
>> strArr = str(element).replace("\"", "").split(",")
>> print(strArr)
>> return [(strArr[1], float(strArr[2]))]
>> except Exception as e :
>> pass
>>
>> # Format the counts into a PCollection of strings.
>> def format_result(userid, amount):
>> try:
>> return '%s = %d' % (userid, amount)
>> except Exception as e:
>> pass
>>
>> def run(argv=None, save_main_session=True):
>> """Main entry point; defines and runs the wordcount pipeline."""
>> parser = argparse.ArgumentParser()
>> parser.add_argument(
>> '--input',
>> dest='input',
>> default='gs://dataflow-samples/shakespeare/kinglear.txt',
>> help='Input file to process.')
>> parser.add_argument(
>> '--output',
>> dest='output',
>> required=True,
>> help='Output file to write results to.')
>> known_args, pipeline_args = parser.parse_known_args(argv)
>>
>> # We use the save_main_session option because one or more DoFn's in this
>> # workflow rely on global context (e.g., a module imported at module
>> level).
>> pipeline_options = PipelineOptions(pipeline_args)
>> pipeline_options.view_as(SetupOptions).save_main_session =
>> save_main_session
>>
>> # The pipeline will be run on exiting the with block.
>> with beam.Pipeline(options=pipeline_options) as p:
>> # Read the text file[pattern] into a PCollection.
>> lines = p | 'Read' >> ReadFromText(known_args.input)
>> counts = (
>> lines
>> | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
>> | 'GroupAndSum' >> beam.CombinePerKey(sum))
>>
>> output = counts | 'Format' >> beam.MapTuple(format_result)
>>
>> output | beam.Map(lambda x: (b'',
>> x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
>> | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers':
>> '10.xxx.xxx.xxx:9092'}, topic='test2')
>>
>>
>
> #-----------------------------------------------------------
> -------------------------------------------------
>
> Here is CMDs to run above pipeline :
>
> CMD-1 : This pipeline is working fine on flink, finished successfully and
> output also gets generated at the end.
> CMD-2: This pipeline is working fine on flink, finished successfully and
> NO OUTPUT at the end.
>
>
>
>> # CMD-1 :
>> *WORKING - output gets generate*python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=LOOPBACK
>>
>>
>
>> *# CMD-2: **WORKING - NO OUTPUT*
>
> python3 batch.py \
>> --input beam-userbase.csv \
>> --output output/batch \
>> --runner=FlinkRunner \
>> --flink_submit_uber_jar \
>> --flink_master=localhost:8090 \
>> --environment_type=DOCKER \
>> --environment_config="apache/beam_python3.7_sdk:2.29.0"
>
>
>