The thing is that the previous log "Returning schema for ..." never appears, so I don't think anything will appear on the log if I log what you suggest too.
Actually, after a couple more attempts, I changed the writeDisposition of the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically worked... So I guess there's something wrong when CREATE_NEVER is set or something I don't understand... FYI my BigQueryIO looks like this BigQueryIO.write() .to(new JsonRouter(dataset)) .withFormatFunction(i => JsonRouter.jsonToRow(i._1)) .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND) .withMethod(Write.Method.FILE_LOADS) Thanks! On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <[email protected]> wrote: > Can you try logging the result of your BigQueryUtil.parseSchema and > confirm that it is always non-empty? What does the result look like for the > table that's failing to load? > > On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]> > wrote: > >> Hi everyone!! >> >> When trying to insert into BigQuery using dynamic destinations I get this >> error: "Tabie with field based partitioning must have a schema" that >> suggests that I'm not providing such a schema and I don't understand why as >> I think I am. Here: https://pastebin.com/Q1jF024B you can find the full >> stack trace and below you can see the code of the DynamicDestinations >> implementation. Basically I'm dumping a stream of PubSub into BQ being that >> stream of heterogeneous Json documents and routing each type to its >> corresponding table. >> >> The tuples contain the Json document and the schema itself for the >> corresponding table (the tuple is composed in a previous transform before >> from a side input as the schema is read from BQ using BigQueryClient >> class). and the Destination KV[String, String] is supposed to hold the >> table name as key and the schema as value. >> >> The logs show many entries for "Returning destination for...", a few of >> "Returning table..." ones and no "Returning schema for..." at all which may >> indicate why BQ complains that no schema is provided, the question would >> then be... Why is that method never invoked? >> >> class JsonRouter(dataset: String) >> extends DynamicDestinations[(Json, String), KV[String, String]] { >> >> import JsonRouter._ >> >> override def getDestination(element: ValueInSingleWindow[(Json, String)]): >> KV[String, String] = { >> log.debug(s"Returning destination for ${element.getValue}") >> KV.of(jsonToTableName(element.getValue._1), element.getValue._2) >> } >> >> override def getSchema(element: KV[String, String]): TableSchema = { >> log.debug(s"Returning schema for ${element.getKey}") >> BigQueryUtil.parseSchema(element.getValue) >> } >> >> override def getTable(element: KV[String, String]): TableDestination = { >> log.debug(s"Returning table for ${element.getKey}") >> new TableDestination(s"$dataset.${element.getKey}", s"Table to store >> ${element.getKey}", >> BQTypesRouter.TimePartitioning) >> } >> >> override def getDestinationCoder: Coder[KV[String, String]] = >> KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()) >> } >> >> >> Thanks!! >> >
