Hi Cham,

Thanks. I have created a PCollection from the dataset that is available in the 
H5 file which is provided as numpy array. 
It is very challenging for my use case to describe the schema. The original 
dimensions of the dataset are 70K x 30K . Any suggestion how to work around 
that?

I think that it was mentioned at the summit that there will be a way to write 
to BQ without schema. Is something like that on the roadmap? 

Best,
Eila

Sent from my iPhone

> On Apr 2, 2018, at 7:33 PM, Chamikara Jayalath <[email protected]> wrote:
> 
> (moving dev to bcc)
> 
> Hi Eila,
> 
>> On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof 
>> <[email protected]> wrote:
>> Hi All,
>> 
>> I was able to make it work by creating the PCollection with the numpy array. 
>> However, writing to BQ was impossible because it requested for the schema.
>> The code:
>> (p | "create all" >> beam.Create(expression[1:5,1:5])
>>    | "write all text" >> beam.io.WriteToText('gs://archs4/output/', 
>> file_name_suffix='.txt'))
>> 
>> Is there a walk around for providing schema for beam.io.BigQuerySink?
> 
> Regarding your earlier question, you do need at least one element in the 
> PCollection that triggers the ParDo to do any work (which can be a create 
> with a single element that you ignore).
> 
> Not sure if I fully understood the BigQuery question. You have to specify a 
> schema when writing to a new BigQuery table. See following example,
> 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L1085
> 
> 
> Thanks,
> Cham
> 
>  
>> 
>> Many thanks,
>> Eila
>> 
>>> On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof 
>>> <[email protected]> wrote:
>>> Hello all,
>>> 
>>> I would like to try a different way to leverage Apache beam for H5 => BQ 
>>> (file to table transfer).
>>> 
>>> For my use case, I would like to read every 10K rows of H5 data (numpy 
>>> array format), transpose them and write them to BQ 10K columns. 10K is BQ 
>>> columns limit.
>>> 
>>> My code is below and fires the following error (I might have missed 
>>> something basic). I am not using beam.Create and trying to create a 
>>> PCollection from the ParDo transfer. is this posssible? if not, what is the 
>>> alternative for creating a PColleciton from numpy array? (if any)
>>> 
>>> ERROR:root:Exception at bundle 
>>> <apache_beam.runners.direct.bundle_factory._Bundle object at 
>>> 0x7f00aad7b7a0>, due to an exception.
>>>  Traceback (most recent call last):
>>>   File 
>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>  line 307, in call
>>>     side_input_values)
>>>   File 
>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>  line 332, in attempt_call
>>>     evaluator.start_bundle()
>>>   File 
>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
>>>  line 540, in start_bundle
>>>     self._applied_ptransform.inputs[0].windowing,
>>> AttributeError: 'PBegin' object has no attribute 'windowing'
>>> 
>>> ERROR:root:Giving up after 4 attempts.
>>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
>>> 'windowing'
>>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
>>> 'windowing'
>>> 
>>> 
>>> Code:
>>> 
>>> options = PipelineOptions()
>>> google_cloud_options = options.view_as(GoogleCloudOptions)
>>> google_cloud_options.project = 'orielresearch-188115'
>>> google_cloud_options.job_name = 'h5-to-bq-10K'
>>> google_cloud_options.staging_location = 'gs://archs4/staging'
>>> google_cloud_options.temp_location = 'gs://archs4/temp'
>>> options.view_as(StandardOptions).runner = 'DirectRunner'  
>>> 
>>> p = beam.Pipeline(options=options)
>>> 
>>> class read10kRowsDoFn(beam.DoFn):
>>>   def process(self, element,index):
>>>     print(index)
>>>     row_start = index
>>>     row_end = index+10000
>>>     # returns numpy array - numpy.ndarray
>>>     d = expression[row_start,row_end,:]
>>>     np.transpose(d)
>>>     return(d)
>>> 
>>> #for i in range(0,expression.shape[0],10000):
>>> k=210 # allows creating unique labels for the runner
>>> for i in range(0,3,2): # test 
>>>   k+=1
>>>   
>>> bigQuery_dataset_table_name=bigquery_dataset_name+'.'+bigquery_table_name+str(k)
>>>   print(bigQuery_dataset_table_name)
>>>   label_read_row = "read "+bigQuery_dataset_table_name
>>>   label_write_col = "write "+bigQuery_dataset_table_name
>>> # is this possible to generate a PCollection with ParDo without create?
>>>   (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
>>>      | label_write_col >> 
>>> beam.io.Write(beam.io.BigQuerySink(bigQuery_dataset_table_name)))
>>> 
>>> p.run().wait_until_finish() #fires an error
>>> 
>>> Many thanks,
>>> 
>>> -- 
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>> 
>> 
>> 
>> -- 
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/

Reply via email to