On Mon, Apr 2, 2018 at 5:19 PM Eila Arich-Landkof <[email protected]>
wrote:

> Hi Cham,
>
> Thanks. I have created a PCollection from the dataset that is available in
> the H5 file which is provided as numpy array.
> It is very challenging for my use case to describe the schema. The
> original dimensions of the dataset are 70K x 30K . Any suggestion how to
> work around that?
>
>
Can you write to a pre-created table by using "create_disposition =
BigQueryDisposition.CREATE_NEVER" ? You can try to use BigQuery schema auto
detection (https://cloud.google.com/bigquery/docs/schema-detect) to create
the table before running the Beam pipeline.


> I think that it was mentioned at the summit that there will be a way to
> write to BQ without schema. Is something like that on the roadmap?
>

I don't think supporting this is in the immediate road map of Beam but any
contributions in this space are welcome.


>
> Best,
> Eila
>
> Sent from my iPhone
>
> On Apr 2, 2018, at 7:33 PM, Chamikara Jayalath <[email protected]>
> wrote:
>
> (moving dev to bcc)
>
> Hi Eila,
>
> On Mon, Apr 2, 2018 at 3:50 PM OrielResearch Eila Arich-Landkof <
> [email protected]> wrote:
>
>> Hi All,
>>
>> I was able to make it work by creating the PCollection with the numpy
>> array. However, writing to BQ was impossible because it requested for the
>> schema.
>> The code:
>> (p | "create all" >> beam.Create(expression[1:5,1:5])
>>    | "write all text" >> beam.io.WriteToText('gs://archs4/output/',
>> file_name_suffix='.txt'))
>>
>> *Is there a walk around for providing schema for beam.io
>> <http://beam.io>.BigQuerySink?*
>>
>
> Regarding your earlier question, you do need at least one element in the
> PCollection that triggers the ParDo to do any work (which can be a create
> with a single element that you ignore).
>
> Not sure if I fully understood the BigQuery question. You have to specify
> a schema when writing to a new BigQuery table. See following example,
>
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/snippets.py#L1085
>
>
> Thanks,
> Cham
>
>
>
>>
>> Many thanks,
>> Eila
>>
>> On Mon, Apr 2, 2018 at 11:33 AM, OrielResearch Eila Arich-Landkof <
>> [email protected]> wrote:
>>
>>> Hello all,
>>>
>>> I would like to try a different way to leverage Apache beam for H5 => BQ
>>> (file to table transfer).
>>>
>>> For my use case, I would like to read every 10K rows of H5 data (numpy
>>> array format), transpose them and write them to BQ 10K columns. 10K is BQ
>>> columns limit.
>>>
>>> My code is below and fires the following error (I might have missed
>>> something basic). I am not using beam.Create and trying to create a
>>> PCollection from the ParDo transfer. is this posssible? if not, what is the
>>> alternative for creating a PColleciton from numpy array? (if any)
>>>
>>> ERROR:root:Exception at bundle 
>>> <apache_beam.runners.direct.bundle_factory._Bundle object at 
>>> 0x7f00aad7b7a0>, due to an exception.
>>>  Traceback (most recent call last):
>>>   File 
>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>  line 307, in call
>>>     side_input_values)
>>>   File 
>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/executor.py",
>>>  line 332, in attempt_call
>>>     evaluator.start_bundle()
>>>   File 
>>> "/usr/local/envs/py2env/lib/python2.7/site-packages/apache_beam/runners/direct/transform_evaluator.py",
>>>  line 540, in start_bundle
>>>     self._applied_ptransform.inputs[0].windowing,
>>> AttributeError: 'PBegin' object has no attribute 'windowing'
>>>
>>> ERROR:root:Giving up after 4 attempts.
>>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
>>> 'windowing'
>>> WARNING:root:A task failed with exception: 'PBegin' object has no attribute 
>>> 'windowing'
>>>
>>>
>>>
>>> *Code:*
>>>
>>> options = PipelineOptions()
>>> google_cloud_options = options.view_as(GoogleCloudOptions)
>>> google_cloud_options.project = 'orielresearch-188115'
>>> google_cloud_options.job_name = 'h5-to-bq-10K'
>>> google_cloud_options.staging_location = 'gs://archs4/staging'
>>> google_cloud_options.temp_location = 'gs://archs4/temp'
>>> options.view_as(StandardOptions).runner = 'DirectRunner'
>>>
>>> p = beam.Pipeline(options=options)
>>>
>>> class read10kRowsDoFn(beam.DoFn):
>>>   def process(self, element,index):
>>>     print(index)
>>>     row_start = index
>>>     row_end = index+10000
>>>     # returns numpy array - numpy.ndarray
>>>     d = expression[row_start,row_end,:]
>>>     np.transpose(d)
>>>     return(d)
>>>
>>> #for i in range(0,expression.shape[0],10000):
>>> k=210 # allows creating unique labels for the runner
>>> for i in range(0,3,2): # test
>>>   k+=1
>>>
>>> bigQuery_dataset_table_name=bigquery_dataset_name+'.'+bigquery_table_name+str(k)
>>>   print(bigQuery_dataset_table_name)
>>>   label_read_row = "read "+bigQuery_dataset_table_name
>>>   label_write_col = "write "+bigQuery_dataset_table_name
>>> *# is this possible to generate a PCollection with ParDo without create?*
>>>   (p | label_read_row >> beam.ParDo(read10kRowsDoFn(i))
>>>      | label_write_col >> beam.io.Write(beam.io
>>> .BigQuerySink(bigQuery_dataset_table_name)))
>>>
>>> p.run().wait_until_finish()* #fires an error*
>>>
>>> Many thanks,
>>>
>>> --
>>> Eila
>>> www.orielresearch.org
>>> https://www.meetup.com/Deep-Learning-In-Production/
>>>
>>
>>
>>
>> --
>> Eila
>> www.orielresearch.org
>> https://www.meetup.com/Deep-Learning-In-Production/
>>
>

Reply via email to