Re: Monitor number of dropped elements due to lateness in Samza/Flink runner

2022-12-07 Thread Nick Caballero
Disclaimer

The information contained in this communication from the sender is 
confidential. It is intended solely for use by the recipient and others 
authorized to receive it. If you are not the recipient, you are hereby notified 
that any disclosure, copying, distribution or taking action in relation of the 
contents of this information is strictly prohibited and may be unlawful.


sudden problems in running a test pipieline

2022-12-07 Thread Sofia’s World
Hello all
 i have been writing pipelines with beam , and suddenly my unit tests with
a weird exception
This is a sample pipeline i have that basically does nothing

key = os.environ['FMPREPKEY']


with TestPipeline() as p:
(p | 'start run_mm' >> beam.Create(['20210101'])
 | 'prnt' >> beam.Map(print)
 )


and still i am getting the exception below
have i somehow messed up my imports?

Kind regards
 Marco


elf = 
action = _StoreAction(option_strings=['--key'], dest='key', nargs=None,
const=None, default=None, type=None, choices=None, help=None, metavar=None)
conflicting_actions = [('--key', _StoreAction(option_strings=['--key'],
dest='key', nargs=None, const=None, default=None, type=None, choices=None,
help=None, metavar=None))]

def _handle_conflict_error(self, action, conflicting_actions):
message = ngettext('conflicting option string: %s',
   'conflicting option strings: %s',
   len(conflicting_actions))
conflict_string = ', '.join([option_string
 for option_string, action
 in conflicting_actions])
>   raise ArgumentError(action, message % conflict_string)
E   argparse.ArgumentError: argument --key: conflicting option string:
--key


Exception handling in ReadFromTextWithFilename?

2022-12-07 Thread Daniel Chen via user
Hi friends,

I encountered an issue with the beam python SDK (2.43.0) recently where I
was using ReadFromTextWithFilename on a Google Cloud Storage (GCS) bucket
that contains roughly 95k  gzip compressed CSV files. One of the files was
truncated in transit, so the job ran for a couple of hours before returning
an exception like zlib.error: Error -3 while decompressing data: incorrect
header check from within the apache_beam.io.Filesystem module. The
exception didn't indicate the filename for the truncated file, and from
looking through the standard library, I couldn't find any mechanism to
handle the exception or to return additional context that would have
allowed me to remediate the situation.

Is there an example of how to handle this situation? Ideally, the library
would return a PCollection of filenames that encountered errors while
reading or something similar to that for further processing rather than
causing a job to crash.