On Mon, Apr 2, 2018 at 5:19 PM Eila Arich-Landkof <[email protected]> wrote:
> Hi Cham, > > Thanks. I have created a PCollection from the dataset that is available in > the H5 file which is provided as numpy array. > It is very challenging for my use case to describe the schema. The > original dimensions of the dataset are 70K x 30K . Any suggestion how to > work around that? > > Can you write to a pre-created table by using "create_disposition = BigQueryDisposition.CREATE_NEVER" ? You can try to use BigQuery schema auto detection (https://cloud.google.com/bigquery/docs/schema-detect) to create the table before running the Beam pipeline. > I think that it was mentioned at the summit that there will be a way to > write to BQ without schema. Is something like that on the roadmap? > I don't think supporting this is in the immediate road map of Beam but any contributions in this space are welcome. > > Best, > Eila > > Sent from my iPhone > > On Apr 2, 2018, at 7:33 PM, Chamikara Jayalath <[email protected]> > wrote: > > (moving dev to bcc) > > Hi Eila, > > On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof < > [email protected]> wrote: > >> Hi All, >> >> I was able to make it work by creating the PCollection with the numpy >> array. However, writing to BQ was impossible because it requested for the >> schema. >> The code: >> (p | "create all" >> beam.Create(expression[1:5,1:5]) >> | "write all text" >> beam.io.WriteToText('gs://archs4/output/', >> file_name_suffix='.txt')) >> >> *Is there a walk around for providing schema for beam.io >> <http://beam.io>.BigQuerySink?* >> > > Regarding your earlier question, you do need at least one element in the > PCollection that triggers the ParDo to do any work (which can be a create > with a single element that you ignore). > > Not sure if I fully understood the BigQuery question. You have to specify > a schema when writing to a new BigQuery table. See following example, > > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L1085 > > > Thanks, > Cham > > > >> >> Many thanks, >> Eila >> >> On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof < >> [email protected]> wrote: >> >>> Hello all, >>> >>> I would like to try a different way to leverage Apache beam for H5 => BQ >>> (file to table transfer). >>> >>> For my use case, I would like to read every 10K rows of H5 data (numpy >>> array format), transpose them and write them to BQ 10K columns. 10K is BQ >>> columns limit. >>> >>> My code is below and fires the following error (I might have missed >>> something basic). I am not using beam.Create and trying to create a >>> PCollection from the ParDo transfer. is this posssible? if not, what is the >>> alternative for creating a PColleciton from numpy array? (if any) >>> >>> ERROR:root:Exception at bundle >>> <apache_beam.runners.direct.bundle_factory._Bundle object at >>> 0x7f00aad7b7a0>, due to an exception. >>> Traceback (most recent call last): >>> File >>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", >>> line 307, in call >>> side_input_values) >>> File >>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py", >>> line 332, in attempt_call >>> evaluator.start_bundle() >>> File >>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py", >>> line 540, in start_bundle >>> self._applied_ptransform.inputs[0].windowing, >>> AttributeError: 'PBegin' object has no attribute 'windowing' >>> >>> ERROR:root:Giving up after 4 attempts. >>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute >>> 'windowing' >>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute >>> 'windowing' >>> >>> >>> >>> *Code:* >>> >>> options = PipelineOptions() >>> google_cloud_options = options.view_as(GoogleCloudOptions) >>> google_cloud_options.project = 'orielresearch-188115' >>> google_cloud_options.job_name = 'h5-to-bq-10K' >>> google_cloud_options.staging_location = 'gs://archs4/staging' >>> google_cloud_options.temp_location = 'gs://archs4/temp' >>> options.view_as(StandardOptions).runner = 'DirectRunner' >>> >>> p = beam.Pipeline(options=options) >>> >>> class read10kRowsDoFn(beam.DoFn): >>> def process(self, element,index): >>> print(index) >>> row_start = index >>> row_end = index+10000 >>> # returns numpy array - numpy.ndarray >>> d = expression[row_start,row_end,:] >>> np.transpose(d) >>> return(d) >>> >>> #for i in range(0,expression.shape[0],10000): >>> k=210 # allows creating unique labels for the runner >>> for i in range(0,3,2): # test >>> k+=1 >>> >>> bigQuery_dataset_table_name=bigquery_dataset_name+'.'+bigquery_table_name+str(k) >>> print(bigQuery_dataset_table_name) >>> label_read_row = "read "+bigQuery_dataset_table_name >>> label_write_col = "write "+bigQuery_dataset_table_name >>> *# is this possible to generate a PCollection with ParDo without create?* >>> (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i)) >>> | label_write_col >> beam.io.Write(beam.io >>> .BigQuerySink(bigQuery_dataset_table_name))) >>> >>> p.run().wait_until_finish()* #fires an error* >>> >>> Many thanks, >>> >>> -- >>> Eila >>> www.orielresearch.org >>> https://www.meetup.com/Deep-Learning-In-Production/ >>> >> >> >> >> -- >> Eila >> www.orielresearch.org >> https://www.meetup.com/Deep-Learning-In-Production/ >> >
