Hello,

Any help will be greatly appreciated!!!

I an using dataflow to process H5 (HDF5 format
<https://support.hdfgroup.org/HDF5/>) file.

The H5 file was uploaded to google storage from: https://amp.pharm.mssm.
edu/archs4/download.html

H5 / HDF5 is an hierarchical data structure to present scientific data


My code:

I have created a setup.py file that is based on juliaset
<https://github.com/apache/beam/blob/5c74022da1bb8f8a822cf3c545f96f2903d175a4/sdks/python/apache_beam/examples/complete/juliaset/setup.py>
 example that was reference in one of the other tickets. my only change
there is the list of packages to install:

REQUIRED_PACKAGES = [
    'numpy',
    'h5py',
    'pandas',
    'tables',
    ]

The pipeline is the following:

import numpy as np
import h5py
import pandas as pd
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions


class ReadGcsBlobs(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield (element, gcs.open(element).read())

class H5Preprocess(beam.DoFn):
    def process(self, element):
        logging.info('**********starting to read H5')
        h5py.File(element, 'r')
        logging.info('**********finished reading H5')
        expression = hdf['/data/']['expression']
        logging.info('**********finished reading the expression node')
        np_expression = expression[1:2,1:2]
        logging.info('**********subset the expression to numpy 2x2')
        yield (element, np_expression)

def run(argv=None):
    pipeline_options = PipelineOptions(argv)
    parser = argparse.ArgumentParser(description="read from h5 blog
and write to file")
    logging.info('**********finish with the parser')

    with beam.Pipeline(options=pipeline_options) as p:
            (p
                | 'Initialize' >> beam.Create(['gs://archs4/human_matrix.h5'])
                | 'Read-blobs' >> beam.ParDo(ReadGcsBlobs())
                | 'pre-process' >> beam.ParDo(H5Preprocess())
                | 'write' >> beam.io.WriteToText('gs://archs4/outputData.txt')
            )
    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

the execution command is the following:

python beam_try1.py --job-name beam-try1 --project
orielresearch-188115 --runner DataflowRunner --setup_file ./setup.py
--temp_location=gs://archs4/tmp --staging_location gs://archs4/staging

and the pipeline Error is the following:

(5a4c72cfc5507714): Workflow failed. Causes: (3bde8bf810c652b2):
S04:Initialize/Read+Read-blobs+pre-process+write/Write/WriteImpl/WriteBundles/WriteBundles+write/Write/WriteImpl/Pair+write/Write/WriteImpl/WindowInto(WindowIntoFn)+write/Write/WriteImpl/GroupByKey/Reify+write/Write/WriteImpl/GroupByKey/Write
failed., (7b4a7abb1a692d12): A work item was attempted 4 times without
success. Each time the worker eventually lost contact with the
service. The work item was attempted on:
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f,
  beamapp-eila-0213182449-2-02131024-1621-harness-vf4f

Could you please advice what need to be fixed?

Thanks,

Eila


-- 
Eila
www.orielresearch.org
https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to