Hi Eila,

The error "work item was attempted 4 times without success" indicates that
some operation is consistently failing. You can find more information in
Dataflow worker logs [1] about the actual error.

I cannot tell for sure without looking at the logs, I suspect your issue is
related to process method of H5Preprocess. Unless you are using
save_main_session flag it will result in a NameError (more on on this [2]).
You can resolve it by importing h5py locally (similar to the way you
imported gcsio above).

For future questions related to getting support on Google Cloud Dataflow
please see their support page [3].

Thank you,
Ahmet

[1]
https://cloud.google.com/dataflow/pipelines/logging#cloud-dataflow-worker-log-example
[2] https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors
[3] https://cloud.google.com/dataflow/support

On Tue, Feb 13, 2018 at 5:21 PM, OrielResearch Eila Arich-Landkof <
e...@orielresearch.org> wrote:

> Hello,
>
> Any help will be greatly appreciated!!!
>
> I an using dataflow to process H5 (HDF5 format
> <https://support.hdfgroup.org/HDF5/>) file.
>
> The H5 file was uploaded to google storage from: https://amp.pharm.mssm.e
> du/archs4/download.html
>
> H5 / HDF5 is an hierarchical data structure to present scientific data
>
>
> My code:
>
> I have created a setup.py file that is based on juliaset
> <https://github.com/apache/beam/blob/5c74022da1bb8f8a822cf3c545f96f2903d175a4/sdks/python/apache_beam/examples/complete/juliaset/setup.py>
>  example that was reference in one of the other tickets. my only change
> there is the list of packages to install:
>
> REQUIRED_PACKAGES = [
>     'numpy',
>     'h5py',
>     'pandas',
>     'tables',
>     ]
>
> The pipeline is the following:
>
> import numpy as np
> import h5py
> import pandas as pd
> import argparse
> import logging
> import re
> import apache_beam as beam
> from apache_beam.io import ReadFromText
> from apache_beam.io import WriteToText
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.options.pipeline_options import SetupOptions
>
>
> class ReadGcsBlobs(beam.DoFn):
>     def process(self, element, *args, **kwargs):
>         from apache_beam.io.gcp import gcsio
>         gcs = gcsio.GcsIO()
>         yield (element, gcs.open(element).read())
>
> class H5Preprocess(beam.DoFn):
>     def process(self, element):
>         logging.info('**********starting to read H5')
>         h5py.File(element, 'r')
>         logging.info('**********finished reading H5')
>         expression = hdf['/data/']['expression']
>         logging.info('**********finished reading the expression node')
>         np_expression = expression[1:2,1:2]
>         logging.info('**********subset the expression to numpy 2x2')
>         yield (element, np_expression)
>
> def run(argv=None):
>     pipeline_options = PipelineOptions(argv)
>     parser = argparse.ArgumentParser(description="read from h5 blog and write 
> to file")
>     logging.info('**********finish with the parser')
>
>     with beam.Pipeline(options=pipeline_options) as p:
>             (p
>                 | 'Initialize' >> beam.Create(['gs://archs4/human_matrix.h5'])
>                 | 'Read-blobs' >> beam.ParDo(ReadGcsBlobs())
>                 | 'pre-process' >> beam.ParDo(H5Preprocess())
>                 | 'write' >> beam.io.WriteToText('gs://archs4/outputData.txt')
>             )
>     p.run()
>
> if __name__ == '__main__':
>     logging.getLogger().setLevel(logging.INFO)
>     run()
>
> the execution command is the following:
>
> python beam_try1.py --job-name beam-try1 --project orielresearch-188115 
> --runner DataflowRunner --setup_file ./setup.py 
> --temp_location=gs://archs4/tmp --staging_location gs://archs4/staging
>
> and the pipeline Error is the following:
>
> (5a4c72cfc5507714): Workflow failed. Causes: (3bde8bf810c652b2): 
> S04:Initialize/Read+Read-blobs+pre-process+write/Write/WriteImpl/WriteBundles/WriteBundles+write/Write/WriteImpl/Pair+write/Write/WriteImpl/WindowInto(WindowIntoFn)+write/Write/WriteImpl/GroupByKey/Reify+write/Write/WriteImpl/GroupByKey/Write
>  failed., (7b4a7abb1a692d12): A work item was attempted 4 times without 
> success. Each time the worker eventually lost contact with the service. The 
> work item was attempted on:
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
>   beamapp-eila-0213182449-2-02131024-1621-harness-vf4f
>
> Could you please advice what need to be fixed?
>
> Thanks,
>
> Eila
>
>
> --
> Eila
> www.orielresearch.org
> https://www.meetup.com/Deep-Learning-In-Production/
>
>

Reply via email to