Can you try logging the result of your BigQueryUtil.parseSchema and confirm
that it is always non-empty? What does the result look like for the table
that's failing to load?

On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]> wrote:

> Hi everyone!!
>
> When trying to insert into BigQuery using dynamic destinations I get this
> error: "Tabie with field based partitioning must have a schema" that
> suggests that I'm not providing such a schema and I don't understand why as
> I think I am. Here: https://pastebin.com/Q1jF024B you can find the full
> stack trace and below you can see the code of the DynamicDestinations
> implementation. Basically I'm dumping a stream of PubSub into BQ being that
> stream of heterogeneous Json documents and routing each type to its
> corresponding table.
>
> The tuples contain the Json document and the schema itself for the
> corresponding table (the tuple is composed in a previous transform before
> from a side input as the schema is read from BQ using BigQueryClient
> class). and the Destination KV[String, String] is supposed to hold the
> table name as key and the schema as value.
>
> The logs show many entries for "Returning destination for...", a few of
> "Returning table..." ones and no "Returning schema for..." at all which may
> indicate why BQ complains that no schema is provided, the question would
> then be... Why is that method never invoked?
>
> class JsonRouter(dataset: String)
>   extends DynamicDestinations[(Json, String), KV[String, String]] {
>
>   import JsonRouter._
>
>   override def getDestination(element: ValueInSingleWindow[(Json, String)]): 
> KV[String, String] = {
>     log.debug(s"Returning destination for ${element.getValue}")
>     KV.of(jsonToTableName(element.getValue._1), element.getValue._2)
>   }
>
>   override def getSchema(element: KV[String, String]): TableSchema = {
>     log.debug(s"Returning schema for ${element.getKey}")
>     BigQueryUtil.parseSchema(element.getValue)
>   }
>
>   override def getTable(element: KV[String, String]): TableDestination = {
>     log.debug(s"Returning table for ${element.getKey}")
>     new TableDestination(s"$dataset.${element.getKey}", s"Table to store 
> ${element.getKey}",
>       BQTypesRouter.TimePartitioning)
>   }
>
>   override def getDestinationCoder: Coder[KV[String, String]] =
>     KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())
> }
>
>
> Thanks!!
>

Reply via email to