On the surface, pipeline looks fine.
As this is a java pipeline running locally, I would recommend checking the
thread stack using jstack and see where is the process stuck.



On Fri, Sep 20, 2019 at 10:39 AM Daniel Amadei <[email protected]>
wrote:

> Hi all,
>
> I'm trying to run the following code using direct runner, consuming from a
> pub sub topic and sending aggregate data to bigquery. When ran locally
> using DirectRunner, the code gets stuck after the window.
>
> If ran using DataflowRunner, it runs successfully.
>
> Any ideas?
>
> Thanks
> Daniel Amadei
>
>
> import java.time.Instant;
> import java.util.ArrayList;
> import java.util.List;
>
> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
> import org.apache.beam.sdk.options.Default;
> import org.apache.beam.sdk.options.Description;
> import org.apache.beam.sdk.options.PipelineOptionsFactory;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.Sum;
> import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
> import org.apache.beam.sdk.transforms.windowing.Window;
> import org.joda.time.Duration;
>
> import com.google.api.services.bigquery.model.TableFieldSchema;
> import com.google.api.services.bigquery.model.TableRow;
> import com.google.api.services.bigquery.model.TableSchema;
>
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
>
> public class StreamDemoConsumer {
>
> private static final Logger LOG =
> LoggerFactory.getLogger(StreamDemoConsumer.class);
>
> public static interface MyOptions extends DataflowPipelineOptions {
> @Description("Output BigQuery table <project_id>:<dataset_id>.<table_id>")
> @Default.String("<project-id>:demos.streamdemo")
> String getOutput();
>
> void setOutput(String s);
>
> @Description("Input topic")
> @Default.String("projects/<project-id>/topics/streamdemo")
> String getInput();
>
> void setInput(String s);
> }
>
> @SuppressWarnings("serial")
> public static void main(String[] args) {
> MyOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
> options.setStreaming(true);
> Pipeline p = Pipeline.create(options);
>
> String topic = options.getInput();
> String output = options.getOutput();
>
> // Build the table schema for the output table.
> List<TableFieldSchema> fields = new ArrayList<>();
> fields.add(new
> TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
> fields.add(new TableFieldSchema().setName("num_words").setType("INTEGER"));
> TableSchema schema = new TableSchema().setFields(fields);
>
> p //
> .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
> .apply("window",
> Window.into(SlidingWindows//
> .of(Duration.standardMinutes(2))//
> .every(Duration.standardSeconds(30)))) //
> .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() {
> @ProcessElement
> public void processElement(ProcessContext c) throws Exception {
> String line = c.element();
> LOG.info(line);
> c.output(line.split(" ").length);
> }
> }))//
> .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) //
> .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() {
> @ProcessElement
> public void processElement(ProcessContext c) throws Exception {
> LOG.info(c.element().toString());
>
> TableRow row = new TableRow();
> row.set("timestamp", Instant.now().toString());
> row.set("num_words", c.element());
> c.output(row);
> }
> })) //
> .apply(BigQueryIO.writeTableRows().to(output)//
> .withSchema(schema)//
> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>
> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
>
> p.run();
> }
> }
>

Reply via email to