I am trying to read from an unbounded source and using FILE_LOADS instead
of streaming inserts towards BigQuery.
If I don't have the following two lines
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(10))
my code works just fine, but uses streaming inserts. If I add them I get a
non-specific stacktrace like:
Exception in thread "main" java.lang.IllegalArgumentException
at
com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)
where line 181 is the first line of the following code excerpt:
BigQueryIO.<Event>write()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withTriggeringFrequency(Duration.standardMinutes(10))
.to(
new DynamicDestinations<Event, String>() {
@Override
public String getDestination(ValueInSingleWindow<Event> element) {
return element.getValue().getTopic();
}
@Override
public TableDestination getTable(String destination) {
return new TableDestination(
"charged-dialect-824:KafkaStaging.test", null, new
TimePartitioning().setType("DAY"));
}
@Override
public TableSchema getSchema(String destination) {
return inputMessagesConfig.getTableSchema(destination);
}
})
.withFormatFunction(
(SerializableFunction<Event, TableRow>)
event -> convertUserEventToTableRow(event))
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
I am not sure what I am doing wrong here, I tried higher values for the
Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the
bottom of this?
Tobi