Shall I file a GH issue for that? Although I guess that’s something on the
BQ side rather than the Beam, right?

Thanks!!
On Sat, 24 Mar 2018 at 22:50, Ted Yu <[email protected]> wrote:

> It seems an improvement can be made where if CREATE_NEVER is present, table
> with field based partitioning doesn't have to be associated with a schema.
>
> Cheers
>
> On Sat, Mar 24, 2018 at 2:30 PM, Carlos Alonso <[email protected]>
> wrote:
>
>> Otherwise the BQ load job fails with the above error as well (Table with
>> field based partitioning must have a schema).
>> On Sat, 24 Mar 2018 at 15:52, Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> Hmm, glad it worked, but - if your create disposition was CREATE_NEVER,
>>> then why implement getSchema at all?
>>>
>>>
>>> On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <[email protected]>
>>> wrote:
>>>
>>>> The thing is that the previous log "Returning schema for ..." never
>>>> appears, so I don't think anything will appear on the log if I log what you
>>>> suggest too.
>>>>
>>>> Actually, after a couple more attempts, I changed the writeDisposition
>>>> of the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically
>>>> worked... So I guess there's something wrong when CREATE_NEVER is set or
>>>> something I don't understand...
>>>>
>>>> FYI my BigQueryIO looks like this
>>>>
>>>> BigQueryIO.write()
>>>>   .to(new JsonRouter(dataset))
>>>>   .withFormatFunction(i => JsonRouter.jsonToRow(i._1))
>>>>   .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>   .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
>>>>   .withMethod(Write.Method.FILE_LOADS)
>>>>
>>>>
>>>> Thanks!
>>>>
>>>> On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <[email protected]>
>>>> wrote:
>>>>
>>>>> Can you try logging the result of your BigQueryUtil.parseSchema and
>>>>> confirm that it is always non-empty? What does the result look like for 
>>>>> the
>>>>> table that's failing to load?
>>>>>
>>>>> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone!!
>>>>>>
>>>>>> When trying to insert into BigQuery using dynamic destinations I get
>>>>>> this error: "Tabie with field based partitioning must have a schema" that
>>>>>> suggests that I'm not providing such a schema and I don't understand why 
>>>>>> as
>>>>>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the
>>>>>> full stack trace and below you can see the code of the 
>>>>>> DynamicDestinations
>>>>>> implementation. Basically I'm dumping a stream of PubSub into BQ being 
>>>>>> that
>>>>>> stream of heterogeneous Json documents and routing each type to its
>>>>>> corresponding table.
>>>>>>
>>>>>> The tuples contain the Json document and the schema itself for the
>>>>>> corresponding table (the tuple is composed in a previous transform before
>>>>>> from a side input as the schema is read from BQ using BigQueryClient
>>>>>> class). and the Destination KV[String, String] is supposed to hold the
>>>>>> table name as key and the schema as value.
>>>>>>
>>>>>> The logs show many entries for "Returning destination for...", a few
>>>>>> of "Returning table..." ones and no "Returning schema for..." at all 
>>>>>> which
>>>>>> may indicate why BQ complains that no schema is provided, the question
>>>>>> would then be... Why is that method never invoked?
>>>>>>
>>>>>> class JsonRouter(dataset: String)
>>>>>>   extends DynamicDestinations[(Json, String), KV[String, String]] {
>>>>>>
>>>>>>   import JsonRouter._
>>>>>>
>>>>>>   override def getDestination(element: ValueInSingleWindow[(Json, 
>>>>>> String)]): KV[String, String] = {
>>>>>>     log.debug(s"Returning destination for ${element.getValue}")
>>>>>>     KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
>>>>>>   }
>>>>>>
>>>>>>   override def getSchema(element: KV[String, String]): TableSchema = {
>>>>>>     log.debug(s"Returning schema for ${element.getKey}")
>>>>>>     BigQueryUtil.parseSchema(element.getValue)
>>>>>>   }
>>>>>>
>>>>>>   override def getTable(element: KV[String, String]): TableDestination = 
>>>>>> {
>>>>>>     log.debug(s"Returning table for ${element.getKey}")
>>>>>>     new TableDestination(s"$dataset.${element.getKey}", s"Table to store 
>>>>>> ${element.getKey}",
>>>>>>       BQTypesRouter.TimePartitioning)
>>>>>>   }
>>>>>>
>>>>>>   override def getDestinationCoder: Coder[KV[String, String]] =
>>>>>>     KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
>>>>>> }
>>>>>>
>>>>>>
>>>>>> Thanks!!
>>>>>>
>>>>>
>

Reply via email to