Otherwise the BQ load job fails with the above error as well (Table with field based partitioning must have a schema). On Sat, 24 Mar 2018 at 15:52, Eugene Kirpichov <[email protected]> wrote:
> Hmm, glad it worked, but - if your create disposition was CREATE_NEVER, > then why implement getSchema at all? > > On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <[email protected]> wrote: > >> The thing is that the previous log "Returning schema for ..." never >> appears, so I don't think anything will appear on the log if I log what you >> suggest too. >> >> Actually, after a couple more attempts, I changed the writeDisposition of >> the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically >> worked... So I guess there's something wrong when CREATE_NEVER is set or >> something I don't understand... >> >> FYI my BigQueryIO looks like this >> >> BigQueryIO.write() >> .to(new JsonRouter(dataset)) >> .withFormatFunction(i => JsonRouter.jsonToRow(i._1)) >> .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) >> .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND) >> .withMethod(Write.Method.FILE_LOADS) >> >> >> Thanks! >> >> On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <[email protected]> >> wrote: >> >>> Can you try logging the result of your BigQueryUtil.parseSchema and >>> confirm that it is always non-empty? What does the result look like for the >>> table that's failing to load? >>> >>> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]> >>> wrote: >>> >>>> Hi everyone!! >>>> >>>> When trying to insert into BigQuery using dynamic destinations I get >>>> this error: "Tabie with field based partitioning must have a schema" that >>>> suggests that I'm not providing such a schema and I don't understand why as >>>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the >>>> full stack trace and below you can see the code of the DynamicDestinations >>>> implementation. Basically I'm dumping a stream of PubSub into BQ being that >>>> stream of heterogeneous Json documents and routing each type to its >>>> corresponding table. >>>> >>>> The tuples contain the Json document and the schema itself for the >>>> corresponding table (the tuple is composed in a previous transform before >>>> from a side input as the schema is read from BQ using BigQueryClient >>>> class). and the Destination KV[String, String] is supposed to hold the >>>> table name as key and the schema as value. >>>> >>>> The logs show many entries for "Returning destination for...", a few of >>>> "Returning table..." ones and no "Returning schema for..." at all which may >>>> indicate why BQ complains that no schema is provided, the question would >>>> then be... Why is that method never invoked? >>>> >>>> class JsonRouter(dataset: String) >>>> extends DynamicDestinations[(Json, String), KV[String, String]] { >>>> >>>> import JsonRouter._ >>>> >>>> override def getDestination(element: ValueInSingleWindow[(Json, >>>> String)]): KV[String, String] = { >>>> log.debug(s"Returning destination for ${element.getValue}") >>>> KV.of(jsonToTableName(element.getValue._1), element.getValue._2) >>>> } >>>> >>>> override def getSchema(element: KV[String, String]): TableSchema = { >>>> log.debug(s"Returning schema for ${element.getKey}") >>>> BigQueryUtil.parseSchema(element.getValue) >>>> } >>>> >>>> override def getTable(element: KV[String, String]): TableDestination = { >>>> log.debug(s"Returning table for ${element.getKey}") >>>> new TableDestination(s"$dataset.${element.getKey}", s"Table to store >>>> ${element.getKey}", >>>> BQTypesRouter.TimePartitioning) >>>> } >>>> >>>> override def getDestinationCoder: Coder[KV[String, String]] = >>>> KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()) >>>> } >>>> >>>> >>>> Thanks!! >>>> >>>
