It seems that all the worker threads are blocked on reading data from PubSub.
Can you try pulling a few pub sub messages from your command line using gcloud beta pubsub subscriptions pull <your subscription> --limit=1 Example thread stack "direct-runner-worker" #19 prio=5 os_prio=31 cpu=5955.37ms elapsed=1441.13s tid=0x00007fa12204d000 nid=0xa30f runnable [0x0000700002a55000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0([email protected]/Native Method) at java.net.SocketInputStream.socketRead([email protected] /SocketInputStream.java:115) at java.net.SocketInputStream.read([email protected] /SocketInputStream.java:168) at java.net.SocketInputStream.read([email protected] /SocketInputStream.java:140) at sun.security.ssl.SSLSocketInputRecord.read([email protected] /SSLSocketInputRecord.java:448) at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket([email protected] /SSLSocketInputRecord.java:68) at sun.security.ssl.SSLSocketImpl.readApplicationRecord([email protected] /SSLSocketImpl.java:1104) at sun.security.ssl.SSLSocketImpl$AppInputStream.read([email protected] /SSLSocketImpl.java:823) - locked <0x0000000625ce3f28> (a sun.security.ssl.SSLSocketImpl$AppInputStream) at java.io.BufferedInputStream.fill([email protected] /BufferedInputStream.java:252) at java.io.BufferedInputStream.read1([email protected] /BufferedInputStream.java:292) at java.io.BufferedInputStream.read([email protected] /BufferedInputStream.java:351) - locked <0x0000000624b3ae28> (a java.io.BufferedInputStream) at sun.net.www.http.HttpClient.parseHTTPHeader([email protected] /HttpClient.java:746) at sun.net.www.http.HttpClient.parseHTTP([email protected] /HttpClient.java:689) at sun.net.www.protocol.http.HttpURLConnection.getInputStream0([email protected] /HttpURLConnection.java:1604) - locked <0x0000000625cdc8a8> (a sun.net.www.protocol.https.DelegateHttpsURLConnection) at sun.net.www.protocol.http.HttpURLConnection.getInputStream([email protected] /HttpURLConnection.java:1509) - locked <0x0000000625cdc8a8> (a sun.net.www.protocol.https.DelegateHttpsURLConnection) at java.net.HttpURLConnection.getResponseCode([email protected] /HttpURLConnection.java:527) at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode([email protected] /HttpsURLConnectionImpl.java:329) at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37) at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:143) at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84) at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1011) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:499) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549) at org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.pull(PubsubJsonClient.java:162) at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.pull(PubsubUnboundedSource.java:719) at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.advance(PubsubUnboundedSource.java:858) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:238) at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:133) at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160) at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124) at java.util.concurrent.Executors$RunnableAdapter.call([email protected] /Executors.java:515) at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected] /ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected] /ThreadPoolExecutor.java:628) at java.lang.Thread.run([email protected]/Thread.java:834) On Fri, Sep 20, 2019 at 11:44 AM Daniel Amadei <[email protected]> wrote: > Hi Ankur, > > I tried to do that but nothing spot to my eyes as the culprit. Stack trace > attached in case you want to give a look. > > Thanks > Daniel > > Em sex, 20 de set de 2019 às 14:44, Ankur Goenka <[email protected]> > escreveu: > >> On the surface, pipeline looks fine. >> As this is a java pipeline running locally, I would recommend checking >> the thread stack using jstack and see where is the process stuck. >> >> >> >> On Fri, Sep 20, 2019 at 10:39 AM Daniel Amadei <[email protected]> >> wrote: >> >>> Hi all, >>> >>> I'm trying to run the following code using direct runner, consuming from >>> a pub sub topic and sending aggregate data to bigquery. When ran locally >>> using DirectRunner, the code gets stuck after the window. >>> >>> If ran using DataflowRunner, it runs successfully. >>> >>> Any ideas? >>> >>> Thanks >>> Daniel Amadei >>> >>> >>> import java.time.Instant; >>> import java.util.ArrayList; >>> import java.util.List; >>> >>> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >>> import org.apache.beam.sdk.Pipeline; >>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; >>> import org.apache.beam.sdk.options.Default; >>> import org.apache.beam.sdk.options.Description; >>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>> import org.apache.beam.sdk.transforms.DoFn; >>> import org.apache.beam.sdk.transforms.ParDo; >>> import org.apache.beam.sdk.transforms.Sum; >>> import org.apache.beam.sdk.transforms.windowing.SlidingWindows; >>> import org.apache.beam.sdk.transforms.windowing.Window; >>> import org.joda.time.Duration; >>> >>> import com.google.api.services.bigquery.model.TableFieldSchema; >>> import com.google.api.services.bigquery.model.TableRow; >>> import com.google.api.services.bigquery.model.TableSchema; >>> >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> >>> public class StreamDemoConsumer { >>> >>> private static final Logger LOG = >>> LoggerFactory.getLogger(StreamDemoConsumer.class); >>> >>> public static interface MyOptions extends DataflowPipelineOptions { >>> @Description("Output BigQuery table >>> <project_id>:<dataset_id>.<table_id>") >>> @Default.String("<project-id>:demos.streamdemo") >>> String getOutput(); >>> >>> void setOutput(String s); >>> >>> @Description("Input topic") >>> @Default.String("projects/<project-id>/topics/streamdemo") >>> String getInput(); >>> >>> void setInput(String s); >>> } >>> >>> @SuppressWarnings("serial") >>> public static void main(String[] args) { >>> MyOptions options = >>> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); >>> options.setStreaming(true); >>> Pipeline p = Pipeline.create(options); >>> >>> String topic = options.getInput(); >>> String output = options.getOutput(); >>> >>> // Build the table schema for the output table. >>> List<TableFieldSchema> fields = new ArrayList<>(); >>> fields.add(new >>> TableFieldSchema().setName("timestamp").setType("TIMESTAMP")); >>> fields.add(new >>> TableFieldSchema().setName("num_words").setType("INTEGER")); >>> TableSchema schema = new TableSchema().setFields(fields); >>> >>> p // >>> .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) // >>> .apply("window", >>> Window.into(SlidingWindows// >>> .of(Duration.standardMinutes(2))// >>> .every(Duration.standardSeconds(30)))) // >>> .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() { >>> @ProcessElement >>> public void processElement(ProcessContext c) throws Exception { >>> String line = c.element(); >>> LOG.info(line); >>> c.output(line.split(" ").length); >>> } >>> }))// >>> .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) // >>> .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() { >>> @ProcessElement >>> public void processElement(ProcessContext c) throws Exception { >>> LOG.info(c.element().toString()); >>> >>> TableRow row = new TableRow(); >>> row.set("timestamp", Instant.now().toString()); >>> row.set("num_words", c.element()); >>> c.output(row); >>> } >>> })) // >>> .apply(BigQueryIO.writeTableRows().to(output)// >>> .withSchema(schema)// >>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>> >>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); >>> >>> p.run(); >>> } >>> } >>> >> > > -- > Abraços, > Daniel Amadei (http://www.amadei.com.br) >
