Hello, Any help will be greatly appreciated!!!
I an using dataflow to process H5 (HDF5 format <https://support.hdfgroup.org/HDF5/>) file. The H5 file was uploaded to google storage from: https://amp.pharm.mssm. edu/archs4/download.html H5 / HDF5 is an hierarchical data structure to present scientific data My code: I have created a setup.py file that is based on juliaset <https://github.com/apache/beam/blob/5c74022da1bb8f8a822cf3c545f96f2903d175a4/sdks/python/apache_beam/examples/complete/juliaset/setup.py> example that was reference in one of the other tickets. my only change there is the list of packages to install: REQUIRED_PACKAGES = [ 'numpy', 'h5py', 'pandas', 'tables', ] The pipeline is the following: import numpy as np import h5py import pandas as pd import argparse import logging import re import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions class ReadGcsBlobs(beam.DoFn): def process(self, element, *args, **kwargs): from apache_beam.io.gcp import gcsio gcs = gcsio.GcsIO() yield (element, gcs.open(element).read()) class H5Preprocess(beam.DoFn): def process(self, element): logging.info('**********starting to read H5') h5py.File(element, 'r') logging.info('**********finished reading H5') expression = hdf['/data/']['expression'] logging.info('**********finished reading the expression node') np_expression = expression[1:2,1:2] logging.info('**********subset the expression to numpy 2x2') yield (element, np_expression) def run(argv=None): pipeline_options = PipelineOptions(argv) parser = argparse.ArgumentParser(description="read from h5 blog and write to file") logging.info('**********finish with the parser') with beam.Pipeline(options=pipeline_options) as p: (p | 'Initialize' >> beam.Create(['gs://archs4/human_matrix.h5']) | 'Read-blobs' >> beam.ParDo(ReadGcsBlobs()) | 'pre-process' >> beam.ParDo(H5Preprocess()) | 'write' >> beam.io.WriteToText('gs://archs4/outputData.txt') ) p.run() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() the execution command is the following: python beam_try1.py --job-name beam-try1 --project orielresearch-188115 --runner DataflowRunner --setup_file ./setup.py --temp_location=gs://archs4/tmp --staging_location gs://archs4/staging and the pipeline Error is the following: (5a4c72cfc5507714): Workflow failed. Causes: (3bde8bf810c652b2): S04:Initialize/Read+Read-blobs+pre-process+write/Write/WriteImpl/WriteBundles/WriteBundles+write/Write/WriteImpl/Pair+write/Write/WriteImpl/WindowInto(WindowIntoFn)+write/Write/WriteImpl/GroupByKey/Reify+write/Write/WriteImpl/GroupByKey/Write failed., (7b4a7abb1a692d12): A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on: beamapp-eila-0213182449-2-02131024-1621-harness-vf4f, beamapp-eila-0213182449-2-02131024-1621-harness-vf4f, beamapp-eila-0213182449-2-02131024-1621-harness-vf4f, beamapp-eila-0213182449-2-02131024-1621-harness-vf4f Could you please advice what need to be fixed? Thanks, Eila -- Eila www.orielresearch.org https://www.meetup.com/Deep-Learning-In-Production/