Can you try logging the result of your BigQueryUtil.parseSchema and confirm that it is always non-empty? What does the result look like for the table that's failing to load?
On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]> wrote: > Hi everyone!! > > When trying to insert into BigQuery using dynamic destinations I get this > error: "Tabie with field based partitioning must have a schema" that > suggests that I'm not providing such a schema and I don't understand why as > I think I am. Here: https://pastebin.com/Q1jF024B you can find the full > stack trace and below you can see the code of the DynamicDestinations > implementation. Basically I'm dumping a stream of PubSub into BQ being that > stream of heterogeneous Json documents and routing each type to its > corresponding table. > > The tuples contain the Json document and the schema itself for the > corresponding table (the tuple is composed in a previous transform before > from a side input as the schema is read from BQ using BigQueryClient > class). and the Destination KV[String, String] is supposed to hold the > table name as key and the schema as value. > > The logs show many entries for "Returning destination for...", a few of > "Returning table..." ones and no "Returning schema for..." at all which may > indicate why BQ complains that no schema is provided, the question would > then be... Why is that method never invoked? > > class JsonRouter(dataset: String) > extends DynamicDestinations[(Json, String), KV[String, String]] { > > import JsonRouter._ > > override def getDestination(element: ValueInSingleWindow[(Json, String)]): > KV[String, String] = { > log.debug(s"Returning destination for ${element.getValue}") > KV.of(jsonToTableName(element.getValue._1), element.getValue._2) > } > > override def getSchema(element: KV[String, String]): TableSchema = { > log.debug(s"Returning schema for ${element.getKey}") > BigQueryUtil.parseSchema(element.getValue) > } > > override def getTable(element: KV[String, String]): TableDestination = { > log.debug(s"Returning table for ${element.getKey}") > new TableDestination(s"$dataset.${element.getKey}", s"Table to store > ${element.getKey}", > BQTypesRouter.TimePartitioning) > } > > override def getDestinationCoder: Coder[KV[String, String]] = > KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()) > } > > > Thanks!! >
