The thing is that the previous log "Returning schema for ..." never
appears, so I don't think anything will appear on the log if I log what you
suggest too.

Actually, after a couple more attempts, I changed the writeDisposition of
the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically
worked... So I guess there's something wrong when CREATE_NEVER is set or
something I don't understand...

FYI my BigQueryIO looks like this

BigQueryIO.write()
  .to(new JsonRouter(dataset))
  .withFormatFunction(i => JsonRouter.jsonToRow(i._1))
  .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
  .withMethod(Write.Method.FILE_LOADS)


Thanks!

On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <[email protected]>
wrote:

> Can you try logging the result of your BigQueryUtil.parseSchema and
> confirm that it is always non-empty? What does the result look like for the
> table that's failing to load?
>
> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]>
> wrote:
>
>> Hi everyone!!
>>
>> When trying to insert into BigQuery using dynamic destinations I get this
>> error: "Tabie with field based partitioning must have a schema" that
>> suggests that I'm not providing such a schema and I don't understand why as
>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the full
>> stack trace and below you can see the code of the DynamicDestinations
>> implementation. Basically I'm dumping a stream of PubSub into BQ being that
>> stream of heterogeneous Json documents and routing each type to its
>> corresponding table.
>>
>> The tuples contain the Json document and the schema itself for the
>> corresponding table (the tuple is composed in a previous transform before
>> from a side input as the schema is read from BQ using BigQueryClient
>> class). and the Destination KV[String, String] is supposed to hold the
>> table name as key and the schema as value.
>>
>> The logs show many entries for "Returning destination for...", a few of
>> "Returning table..." ones and no "Returning schema for..." at all which may
>> indicate why BQ complains that no schema is provided, the question would
>> then be... Why is that method never invoked?
>>
>> class JsonRouter(dataset: String)
>>   extends DynamicDestinations[(Json, String), KV[String, String]] {
>>
>>   import JsonRouter._
>>
>>   override def getDestination(element: ValueInSingleWindow[(Json, String)]): 
>> KV[String, String] = {
>>     log.debug(s"Returning destination for ${element.getValue}")
>>     KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
>>   }
>>
>>   override def getSchema(element: KV[String, String]): TableSchema = {
>>     log.debug(s"Returning schema for ${element.getKey}")
>>     BigQueryUtil.parseSchema(element.getValue)
>>   }
>>
>>   override def getTable(element: KV[String, String]): TableDestination = {
>>     log.debug(s"Returning table for ${element.getKey}")
>>     new TableDestination(s"$dataset.${element.getKey}", s"Table to store 
>> ${element.getKey}",
>>       BQTypesRouter.TimePartitioning)
>>   }
>>
>>   override def getDestinationCoder: Coder[KV[String, String]] =
>>     KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
>> }
>>
>>
>> Thanks!!
>>
>

Reply via email to