Team,
I am new to Beam. And running a simple python pipeline on flink as runner.
Pipelines get Successfully Finished But i am not getting expected output at
the end.
#------------------------------------------------------------------------------------------------------------
Here is Pipeline code :
#!/usr/bin/python3
>
> import argparse
> import logging
> import re
> import typing
>
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
>
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
> from decimal import Decimal
>
>
> class AmoountExtractingDoFn(beam.DoFn):
> def process(self, element):
> # Returns an iterator over the words of this element.
> try:
> strArr = str(element).replace("\"", "").split(",")
> print(strArr)
> return [(strArr[1], float(strArr[2]))]
> except Exception as e :
> pass
>
> # Format the counts into a PCollection of strings.
> def format_result(userid, amount):
> try:
> return '%s = %d' % (userid, amount)
> except Exception as e:
> pass
>
> def run(argv=None, save_main_session=True):
> """Main entry point; defines and runs the wordcount pipeline."""
> parser = argparse.ArgumentParser()
> parser.add_argument(
> '--input',
> dest='input',
> default='gs://dataflow-samples/shakespeare/kinglear.txt',
> help='Input file to process.')
> parser.add_argument(
> '--output',
> dest='output',
> required=True,
> help='Output file to write results to.')
> known_args, pipeline_args = parser.parse_known_args(argv)
>
> # We use the save_main_session option because one or more DoFn's in this
> # workflow rely on global context (e.g., a module imported at module level).
> pipeline_options = PipelineOptions(pipeline_args)
> pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>
> # The pipeline will be run on exiting the with block.
> with beam.Pipeline(options=pipeline_options) as p:
> # Read the text file[pattern] into a PCollection.
> lines = p | 'Read' >> ReadFromText(known_args.input)
> counts = (
> lines
> | 'ExtractAmountPerUser' >> (beam.ParDo(AmoountExtractingDoFn()))
> | 'GroupAndSum' >> beam.CombinePerKey(sum))
>
> output = counts | 'Format' >> beam.MapTuple(format_result)
>
> output | beam.Map(lambda x: (b'',
> x.encode('utf-8'))).with_output_types(typing.Tuple[bytes, bytes])\
> | 'Write to Kafka' >> WriteToKafka(producer_config={'bootstrap.servers':
> '10.xxx.xxx.xxx:9092'}, topic='test2')
>
>
#-----------------------------------------------------------
-------------------------------------------------
Here is CMDs to run above pipeline :
CMD-1 : This pipeline is working fine on flink, finished successfully and
output also gets generated at the end.
CMD-2: This pipeline is working fine on flink, finished successfully and NO
OUTPUT at the end.
> # CMD-1 :
> *WORKING - output gets generate*python3 batch.py \
> --input beam-userbase.csv \
> --output output/batch \
> --runner=FlinkRunner \
> --flink_submit_uber_jar \
> --flink_master=localhost:8090 \
> --environment_type=LOOPBACK
>
>
> *# CMD-2: **WORKING - NO OUTPUT*
python3 batch.py \
> --input beam-userbase.csv \
> --output output/batch \
> --runner=FlinkRunner \
> --flink_submit_uber_jar \
> --flink_master=localhost:8090 \
> --environment_type=DOCKER \
> --environment_config="apache/beam_python3.7_sdk:2.29.0"