Hi, Are you are using a very small volume of PubSub messages for testing purposes, for example manually sending just one message to see it work its way through?
Don't have the Jira number to hand but if I recall the the PubSubIO connector for the directrunner had a issue around moving the watermark in testing scenarios, so very low qps. If that is the case here, could you try with some more volume on the PubSub topic? Cheers Reza On Sat, 21 Sep 2019, 04:13 Ankur Goenka, <[email protected]> wrote: > It seems that all the worker threads are blocked on reading data from > PubSub. > > Can you try pulling a few pub sub messages from your command line using > gcloud beta pubsub subscriptions pull <your subscription> --limit=1 > > Example thread stack > > "direct-runner-worker" #19 prio=5 os_prio=31 cpu=5955.37ms > elapsed=1441.13s tid=0x00007fa12204d000 nid=0xa30f runnable > [0x0000700002a55000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0([email protected]/Native Method) > at java.net.SocketInputStream.socketRead([email protected] > /SocketInputStream.java:115) > at java.net.SocketInputStream.read([email protected] > /SocketInputStream.java:168) > at java.net.SocketInputStream.read([email protected] > /SocketInputStream.java:140) > at sun.security.ssl.SSLSocketInputRecord.read([email protected] > /SSLSocketInputRecord.java:448) > at > sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket([email protected] > /SSLSocketInputRecord.java:68) > at sun.security.ssl.SSLSocketImpl.readApplicationRecord([email protected] > /SSLSocketImpl.java:1104) > at sun.security.ssl.SSLSocketImpl$AppInputStream.read([email protected] > /SSLSocketImpl.java:823) > - locked <0x0000000625ce3f28> (a > sun.security.ssl.SSLSocketImpl$AppInputStream) > at java.io.BufferedInputStream.fill([email protected] > /BufferedInputStream.java:252) > at java.io.BufferedInputStream.read1([email protected] > /BufferedInputStream.java:292) > at java.io.BufferedInputStream.read([email protected] > /BufferedInputStream.java:351) > - locked <0x0000000624b3ae28> (a java.io.BufferedInputStream) > at sun.net.www.http.HttpClient.parseHTTPHeader([email protected] > /HttpClient.java:746) > at sun.net.www.http.HttpClient.parseHTTP([email protected] > /HttpClient.java:689) > at > sun.net.www.protocol.http.HttpURLConnection.getInputStream0([email protected] > /HttpURLConnection.java:1604) > - locked <0x0000000625cdc8a8> (a > sun.net.www.protocol.https.DelegateHttpsURLConnection) > at > sun.net.www.protocol.http.HttpURLConnection.getInputStream([email protected] > /HttpURLConnection.java:1509) > - locked <0x0000000625cdc8a8> (a > sun.net.www.protocol.https.DelegateHttpsURLConnection) > at java.net.HttpURLConnection.getResponseCode([email protected] > /HttpURLConnection.java:527) > at > sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode([email protected] > /HttpsURLConnectionImpl.java:329) > at > com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37) > at > com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:143) > at > com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84) > at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1011) > at > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:499) > at > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432) > at > com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549) > at > org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.pull(PubsubJsonClient.java:162) > at > org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.pull(PubsubUnboundedSource.java:719) > at > org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.advance(PubsubUnboundedSource.java:858) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:238) > at > org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:133) > at > org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160) > at > org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124) > at java.util.concurrent.Executors$RunnableAdapter.call([email protected] > /Executors.java:515) > at java.util.concurrent.FutureTask.run([email protected] > /FutureTask.java:264) > at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected] > /ThreadPoolExecutor.java:1128) > at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected] > /ThreadPoolExecutor.java:628) > at java.lang.Thread.run([email protected]/Thread.java:834) > > On Fri, Sep 20, 2019 at 11:44 AM Daniel Amadei <[email protected]> > wrote: > >> Hi Ankur, >> >> I tried to do that but nothing spot to my eyes as the culprit. Stack >> trace attached in case you want to give a look. >> >> Thanks >> Daniel >> >> Em sex, 20 de set de 2019 às 14:44, Ankur Goenka <[email protected]> >> escreveu: >> >>> On the surface, pipeline looks fine. >>> As this is a java pipeline running locally, I would recommend checking >>> the thread stack using jstack and see where is the process stuck. >>> >>> >>> >>> On Fri, Sep 20, 2019 at 10:39 AM Daniel Amadei <[email protected]> >>> wrote: >>> >>>> Hi all, >>>> >>>> I'm trying to run the following code using direct runner, consuming >>>> from a pub sub topic and sending aggregate data to bigquery. When ran >>>> locally using DirectRunner, the code gets stuck after the window. >>>> >>>> If ran using DataflowRunner, it runs successfully. >>>> >>>> Any ideas? >>>> >>>> Thanks >>>> Daniel Amadei >>>> >>>> >>>> import java.time.Instant; >>>> import java.util.ArrayList; >>>> import java.util.List; >>>> >>>> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; >>>> import org.apache.beam.sdk.Pipeline; >>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; >>>> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; >>>> import org.apache.beam.sdk.options.Default; >>>> import org.apache.beam.sdk.options.Description; >>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>> import org.apache.beam.sdk.transforms.DoFn; >>>> import org.apache.beam.sdk.transforms.ParDo; >>>> import org.apache.beam.sdk.transforms.Sum; >>>> import org.apache.beam.sdk.transforms.windowing.SlidingWindows; >>>> import org.apache.beam.sdk.transforms.windowing.Window; >>>> import org.joda.time.Duration; >>>> >>>> import com.google.api.services.bigquery.model.TableFieldSchema; >>>> import com.google.api.services.bigquery.model.TableRow; >>>> import com.google.api.services.bigquery.model.TableSchema; >>>> >>>> import org.slf4j.Logger; >>>> import org.slf4j.LoggerFactory; >>>> >>>> >>>> public class StreamDemoConsumer { >>>> >>>> private static final Logger LOG = >>>> LoggerFactory.getLogger(StreamDemoConsumer.class); >>>> >>>> public static interface MyOptions extends DataflowPipelineOptions { >>>> @Description("Output BigQuery table >>>> <project_id>:<dataset_id>.<table_id>") >>>> @Default.String("<project-id>:demos.streamdemo") >>>> String getOutput(); >>>> >>>> void setOutput(String s); >>>> >>>> @Description("Input topic") >>>> @Default.String("projects/<project-id>/topics/streamdemo") >>>> String getInput(); >>>> >>>> void setInput(String s); >>>> } >>>> >>>> @SuppressWarnings("serial") >>>> public static void main(String[] args) { >>>> MyOptions options = >>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); >>>> options.setStreaming(true); >>>> Pipeline p = Pipeline.create(options); >>>> >>>> String topic = options.getInput(); >>>> String output = options.getOutput(); >>>> >>>> // Build the table schema for the output table. >>>> List<TableFieldSchema> fields = new ArrayList<>(); >>>> fields.add(new >>>> TableFieldSchema().setName("timestamp").setType("TIMESTAMP")); >>>> fields.add(new >>>> TableFieldSchema().setName("num_words").setType("INTEGER")); >>>> TableSchema schema = new TableSchema().setFields(fields); >>>> >>>> p // >>>> .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) // >>>> .apply("window", >>>> Window.into(SlidingWindows// >>>> .of(Duration.standardMinutes(2))// >>>> .every(Duration.standardSeconds(30)))) // >>>> .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() { >>>> @ProcessElement >>>> public void processElement(ProcessContext c) throws Exception { >>>> String line = c.element(); >>>> LOG.info(line); >>>> c.output(line.split(" ").length); >>>> } >>>> }))// >>>> .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) // >>>> .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() { >>>> @ProcessElement >>>> public void processElement(ProcessContext c) throws Exception { >>>> LOG.info(c.element().toString()); >>>> >>>> TableRow row = new TableRow(); >>>> row.set("timestamp", Instant.now().toString()); >>>> row.set("num_words", c.element()); >>>> c.output(row); >>>> } >>>> })) // >>>> .apply(BigQueryIO.writeTableRows().to(output)// >>>> .withSchema(schema)// >>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) >>>> >>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); >>>> >>>> p.run(); >>>> } >>>> } >>>> >>> >> >> -- >> Abraços, >> Daniel Amadei (http://www.amadei.com.br) >> >
