Hmm, glad it worked, but - if your create disposition was CREATE_NEVER,
then why implement getSchema at all?

On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <[email protected]> wrote:

> The thing is that the previous log "Returning schema for ..." never
> appears, so I don't think anything will appear on the log if I log what you
> suggest too.
>
> Actually, after a couple more attempts, I changed the writeDisposition of
> the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically
> worked... So I guess there's something wrong when CREATE_NEVER is set or
> something I don't understand...
>
> FYI my BigQueryIO looks like this
>
> BigQueryIO.write()
>   .to(new JsonRouter(dataset))
>   .withFormatFunction(i => JsonRouter.jsonToRow(i._1))
>   .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
>   .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
>   .withMethod(Write.Method.FILE_LOADS)
>
>
> Thanks!
>
> On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <[email protected]>
> wrote:
>
>> Can you try logging the result of your BigQueryUtil.parseSchema and
>> confirm that it is always non-empty? What does the result look like for the
>> table that's failing to load?
>>
>> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]>
>> wrote:
>>
>>> Hi everyone!!
>>>
>>> When trying to insert into BigQuery using dynamic destinations I get
>>> this error: "Tabie with field based partitioning must have a schema" that
>>> suggests that I'm not providing such a schema and I don't understand why as
>>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the full
>>> stack trace and below you can see the code of the DynamicDestinations
>>> implementation. Basically I'm dumping a stream of PubSub into BQ being that
>>> stream of heterogeneous Json documents and routing each type to its
>>> corresponding table.
>>>
>>> The tuples contain the Json document and the schema itself for the
>>> corresponding table (the tuple is composed in a previous transform before
>>> from a side input as the schema is read from BQ using BigQueryClient
>>> class). and the Destination KV[String, String] is supposed to hold the
>>> table name as key and the schema as value.
>>>
>>> The logs show many entries for "Returning destination for...", a few of
>>> "Returning table..." ones and no "Returning schema for..." at all which may
>>> indicate why BQ complains that no schema is provided, the question would
>>> then be... Why is that method never invoked?
>>>
>>> class JsonRouter(dataset: String)
>>>   extends DynamicDestinations[(Json, String), KV[String, String]] {
>>>
>>>   import JsonRouter._
>>>
>>>   override def getDestination(element: ValueInSingleWindow[(Json, 
>>> String)]): KV[String, String] = {
>>>     log.debug(s"Returning destination for ${element.getValue}")
>>>     KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
>>>   }
>>>
>>>   override def getSchema(element: KV[String, String]): TableSchema = {
>>>     log.debug(s"Returning schema for ${element.getKey}")
>>>     BigQueryUtil.parseSchema(element.getValue)
>>>   }
>>>
>>>   override def getTable(element: KV[String, String]): TableDestination = {
>>>     log.debug(s"Returning table for ${element.getKey}")
>>>     new TableDestination(s"$dataset.${element.getKey}", s"Table to store 
>>> ${element.getKey}",
>>>       BQTypesRouter.TimePartitioning)
>>>   }
>>>
>>>   override def getDestinationCoder: Coder[KV[String, String]] =
>>>     KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
>>> }
>>>
>>>
>>> Thanks!!
>>>
>>

Reply via email to