On the surface, pipeline looks fine. As this is a java pipeline running locally, I would recommend checking the thread stack using jstack and see where is the process stuck.
On Fri, Sep 20, 2019 at 10:39 AM Daniel Amadei <[email protected]> wrote: > Hi all, > > I'm trying to run the following code using direct runner, consuming from a > pub sub topic and sending aggregate data to bigquery. When ran locally > using DirectRunner, the code gets stuck after the window. > > If ran using DataflowRunner, it runs successfully. > > Any ideas? > > Thanks > Daniel Amadei > > > import java.time.Instant; > import java.util.ArrayList; > import java.util.List; > > import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; > import org.apache.beam.sdk.Pipeline; > import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; > import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; > import org.apache.beam.sdk.options.Default; > import org.apache.beam.sdk.options.Description; > import org.apache.beam.sdk.options.PipelineOptionsFactory; > import org.apache.beam.sdk.transforms.DoFn; > import org.apache.beam.sdk.transforms.ParDo; > import org.apache.beam.sdk.transforms.Sum; > import org.apache.beam.sdk.transforms.windowing.SlidingWindows; > import org.apache.beam.sdk.transforms.windowing.Window; > import org.joda.time.Duration; > > import com.google.api.services.bigquery.model.TableFieldSchema; > import com.google.api.services.bigquery.model.TableRow; > import com.google.api.services.bigquery.model.TableSchema; > > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > > public class StreamDemoConsumer { > > private static final Logger LOG = > LoggerFactory.getLogger(StreamDemoConsumer.class); > > public static interface MyOptions extends DataflowPipelineOptions { > @Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>") > @Default.String("<project-id>:demos.streamdemo") > String getOutput(); > > void setOutput(String s); > > @Description("Input topic") > @Default.String("projects/<project-id>/topics/streamdemo") > String getInput(); > > void setInput(String s); > } > > @SuppressWarnings("serial") > public static void main(String[] args) { > MyOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); > options.setStreaming(true); > Pipeline p = Pipeline.create(options); > > String topic = options.getInput(); > String output = options.getOutput(); > > // Build the table schema for the output table. > List<TableFieldSchema> fields = new ArrayList<>(); > fields.add(new > TableFieldSchema().setName("timestamp").setType("TIMESTAMP")); > fields.add(new TableFieldSchema().setName("num_words").setType("INTEGER")); > TableSchema schema = new TableSchema().setFields(fields); > > p // > .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) // > .apply("window", > Window.into(SlidingWindows// > .of(Duration.standardMinutes(2))// > .every(Duration.standardSeconds(30)))) // > .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() { > @ProcessElement > public void processElement(ProcessContext c) throws Exception { > String line = c.element(); > LOG.info(line); > c.output(line.split(" ").length); > } > }))// > .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) // > .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() { > @ProcessElement > public void processElement(ProcessContext c) throws Exception { > LOG.info(c.element().toString()); > > TableRow row = new TableRow(); > row.set("timestamp", Instant.now().toString()); > row.set("num_words", c.element()); > c.output(row); > } > })) // > .apply(BigQueryIO.writeTableRows().to(output)// > .withSchema(schema)// > .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) > > .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); > > p.run(); > } > } >
