Hey Tobias,

org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)



points to the following code snippet (starting from BatchLoads.java:210) :

if (bigQueryServices == null) {
  try {
    GcsPath.fromUri(tempLocation);
  } catch (IllegalArgumentException e) {
    throw new IllegalArgumentException(
        String.format(
            "BigQuery temp location expected a valid 'gs://' path, but was 
given '%s'",
            tempLocation),
        e);
  }
}



are you sure your templocation is set correctly? I guess it’s needed for 
staging a bigquery load job instead of streaming.



Wout



From: "Kaymak, Tobias" <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Wednesday, 10 October 2018 at 14:18
To: "[email protected]" <[email protected]>
Subject: How to use of BigQueryIO Method.FILE_LOADS when reading from a 
unbounded source?

I am trying to read from an unbounded source and using FILE_LOADS instead of 
streaming inserts towards BigQuery.

If I don't have the following two lines

 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

my code works just fine, but uses streaming inserts. If I add them I get a 
non-specific stacktrace like:

Exception in thread "main" java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at 
org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:78)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1694)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1638)
at 
org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1070)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at ch.ricardo.di.beam.KafkaToBigQuery.main(KafkaToBigQuery.java:181)

where line 181 is the first line of the following code excerpt:

BigQueryIO.<Event>write()
  .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
  .withTriggeringFrequency(Duration.standardMinutes(10))

  .to(
      new DynamicDestinations<Event, String>() {
        @Override
        public String getDestination(ValueInSingleWindow<Event> element) {
          return element.getValue().getTopic();
        }

        @Override
        public TableDestination getTable(String destination) {
          return new TableDestination(
              "charged-dialect-824:KafkaStaging.test", null, new 
TimePartitioning().setType("DAY"));
        }

        @Override
        public TableSchema getSchema(String destination) {
          return inputMessagesConfig.getTableSchema(destination);
        }
      })
  .withFormatFunction(
      (SerializableFunction<Event, TableRow>)
          event -> convertUserEventToTableRow(event))
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

I am not sure what I am doing wrong here, I tried higher values for the 
Duration, but that didn't help. I wasn't able to find the root
cause for the exception with the debugger, any idea how I can get to the bottom 
of this?

Tobi

Reply via email to