Shall I file a GH issue for that? Although I guess that’s something on the BQ side rather than the Beam, right?
Thanks!! On Sat, 24 Mar 2018 at 22:50, Ted Yu <[email protected]> wrote: > It seems an improvement can be made where if CREATE_NEVER is present, table > with field based partitioning doesn't have to be associated with a schema. > > Cheers > > On Sat, Mar 24, 2018 at 2:30 PM, Carlos Alonso <[email protected]> > wrote: > >> Otherwise the BQ load job fails with the above error as well (Table with >> field based partitioning must have a schema). >> On Sat, 24 Mar 2018 at 15:52, Eugene Kirpichov <[email protected]> >> wrote: >> >>> Hmm, glad it worked, but - if your create disposition was CREATE_NEVER, >>> then why implement getSchema at all? >>> >>> >>> On Sat, Mar 24, 2018, 7:01 AM Carlos Alonso <[email protected]> >>> wrote: >>> >>>> The thing is that the previous log "Returning schema for ..." never >>>> appears, so I don't think anything will appear on the log if I log what you >>>> suggest too. >>>> >>>> Actually, after a couple more attempts, I changed the writeDisposition >>>> of the transform (from CREATE_NEVER to CREATE_IF_NEEDED) and it magically >>>> worked... So I guess there's something wrong when CREATE_NEVER is set or >>>> something I don't understand... >>>> >>>> FYI my BigQueryIO looks like this >>>> >>>> BigQueryIO.write() >>>> .to(new JsonRouter(dataset)) >>>> .withFormatFunction(i => JsonRouter.jsonToRow(i._1)) >>>> .withCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED) >>>> .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND) >>>> .withMethod(Write.Method.FILE_LOADS) >>>> >>>> >>>> Thanks! >>>> >>>> On Fri, Mar 23, 2018 at 11:08 PM Eugene Kirpichov <[email protected]> >>>> wrote: >>>> >>>>> Can you try logging the result of your BigQueryUtil.parseSchema and >>>>> confirm that it is always non-empty? What does the result look like for >>>>> the >>>>> table that's failing to load? >>>>> >>>>> On Fri, Mar 23, 2018 at 6:01 PM Carlos Alonso <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi everyone!! >>>>>> >>>>>> When trying to insert into BigQuery using dynamic destinations I get >>>>>> this error: "Tabie with field based partitioning must have a schema" that >>>>>> suggests that I'm not providing such a schema and I don't understand why >>>>>> as >>>>>> I think I am. Here: https://pastebin.com/Q1jF024B you can find the >>>>>> full stack trace and below you can see the code of the >>>>>> DynamicDestinations >>>>>> implementation. Basically I'm dumping a stream of PubSub into BQ being >>>>>> that >>>>>> stream of heterogeneous Json documents and routing each type to its >>>>>> corresponding table. >>>>>> >>>>>> The tuples contain the Json document and the schema itself for the >>>>>> corresponding table (the tuple is composed in a previous transform before >>>>>> from a side input as the schema is read from BQ using BigQueryClient >>>>>> class). and the Destination KV[String, String] is supposed to hold the >>>>>> table name as key and the schema as value. >>>>>> >>>>>> The logs show many entries for "Returning destination for...", a few >>>>>> of "Returning table..." ones and no "Returning schema for..." at all >>>>>> which >>>>>> may indicate why BQ complains that no schema is provided, the question >>>>>> would then be... Why is that method never invoked? >>>>>> >>>>>> class JsonRouter(dataset: String) >>>>>> extends DynamicDestinations[(Json, String), KV[String, String]] { >>>>>> >>>>>> import JsonRouter._ >>>>>> >>>>>> override def getDestination(element: ValueInSingleWindow[(Json, >>>>>> String)]): KV[String, String] = { >>>>>> log.debug(s"Returning destination for ${element.getValue}") >>>>>> KV.of(jsonToTableName(element.getValue._1), element.getValue._2) >>>>>> } >>>>>> >>>>>> override def getSchema(element: KV[String, String]): TableSchema = { >>>>>> log.debug(s"Returning schema for ${element.getKey}") >>>>>> BigQueryUtil.parseSchema(element.getValue) >>>>>> } >>>>>> >>>>>> override def getTable(element: KV[String, String]): TableDestination = >>>>>> { >>>>>> log.debug(s"Returning table for ${element.getKey}") >>>>>> new TableDestination(s"$dataset.${element.getKey}", s"Table to store >>>>>> ${element.getKey}", >>>>>> BQTypesRouter.TimePartitioning) >>>>>> } >>>>>> >>>>>> override def getDestinationCoder: Coder[KV[String, String]] = >>>>>> KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()) >>>>>> } >>>>>> >>>>>> >>>>>> Thanks!! >>>>>> >>>>> >
