It seems that all the worker threads are blocked on reading data from
PubSub.

Can you try pulling a few pub sub messages from your command line using
gcloud beta pubsub subscriptions pull <your subscription> --limit=1

Example thread stack

"direct-runner-worker" #19 prio=5 os_prio=31 cpu=5955.37ms elapsed=1441.13s
tid=0x00007fa12204d000 nid=0xa30f runnable  [0x0000700002a55000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0([email protected]/Native Method)
at java.net.SocketInputStream.socketRead([email protected]
/SocketInputStream.java:115)
at java.net.SocketInputStream.read([email protected]
/SocketInputStream.java:168)
at java.net.SocketInputStream.read([email protected]
/SocketInputStream.java:140)
at sun.security.ssl.SSLSocketInputRecord.read([email protected]
/SSLSocketInputRecord.java:448)
at
sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket([email protected]
/SSLSocketInputRecord.java:68)
at sun.security.ssl.SSLSocketImpl.readApplicationRecord([email protected]
/SSLSocketImpl.java:1104)
at sun.security.ssl.SSLSocketImpl$AppInputStream.read([email protected]
/SSLSocketImpl.java:823)
- locked <0x0000000625ce3f28> (a
sun.security.ssl.SSLSocketImpl$AppInputStream)
at java.io.BufferedInputStream.fill([email protected]
/BufferedInputStream.java:252)
at java.io.BufferedInputStream.read1([email protected]
/BufferedInputStream.java:292)
at java.io.BufferedInputStream.read([email protected]
/BufferedInputStream.java:351)
- locked <0x0000000624b3ae28> (a java.io.BufferedInputStream)
at sun.net.www.http.HttpClient.parseHTTPHeader([email protected]
/HttpClient.java:746)
at sun.net.www.http.HttpClient.parseHTTP([email protected]
/HttpClient.java:689)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream0([email protected]
/HttpURLConnection.java:1604)
- locked <0x0000000625cdc8a8> (a
sun.net.www.protocol.https.DelegateHttpsURLConnection)
at
sun.net.www.protocol.http.HttpURLConnection.getInputStream([email protected]
/HttpURLConnection.java:1509)
- locked <0x0000000625cdc8a8> (a
sun.net.www.protocol.https.DelegateHttpsURLConnection)
at java.net.HttpURLConnection.getResponseCode([email protected]
/HttpURLConnection.java:527)
at
sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode([email protected]
/HttpsURLConnectionImpl.java:329)
at
com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
at
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:143)
at
com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1011)
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:499)
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
at
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549)
at
org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.pull(PubsubJsonClient.java:162)
at
org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.pull(PubsubUnboundedSource.java:719)
at
org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.advance(PubsubUnboundedSource.java:858)
at
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:238)
at
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:133)
at
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
at
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
at java.util.concurrent.Executors$RunnableAdapter.call([email protected]
/Executors.java:515)
at java.util.concurrent.FutureTask.run([email protected]/FutureTask.java:264)
at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]
/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]
/ThreadPoolExecutor.java:628)
at java.lang.Thread.run([email protected]/Thread.java:834)

On Fri, Sep 20, 2019 at 11:44 AM Daniel Amadei <[email protected]>
wrote:

> Hi Ankur,
>
> I tried to do that but nothing spot to my eyes as the culprit. Stack trace
> attached in case you want to give a look.
>
> Thanks
> Daniel
>
> Em sex, 20 de set de 2019 às 14:44, Ankur Goenka <[email protected]>
> escreveu:
>
>> On the surface, pipeline looks fine.
>> As this is a java pipeline running locally, I would recommend checking
>> the thread stack using jstack and see where is the process stuck.
>>
>>
>>
>> On Fri, Sep 20, 2019 at 10:39 AM Daniel Amadei <[email protected]>
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to run the following code using direct runner, consuming from
>>> a pub sub topic and sending aggregate data to bigquery. When ran locally
>>> using DirectRunner, the code gets stuck after the window.
>>>
>>> If ran using DataflowRunner, it runs successfully.
>>>
>>> Any ideas?
>>>
>>> Thanks
>>> Daniel Amadei
>>>
>>>
>>> import java.time.Instant;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>>
>>> import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>>> import org.apache.beam.sdk.Pipeline;
>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
>>> import org.apache.beam.sdk.options.Default;
>>> import org.apache.beam.sdk.options.Description;
>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>> import org.apache.beam.sdk.transforms.DoFn;
>>> import org.apache.beam.sdk.transforms.ParDo;
>>> import org.apache.beam.sdk.transforms.Sum;
>>> import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
>>> import org.apache.beam.sdk.transforms.windowing.Window;
>>> import org.joda.time.Duration;
>>>
>>> import com.google.api.services.bigquery.model.TableFieldSchema;
>>> import com.google.api.services.bigquery.model.TableRow;
>>> import com.google.api.services.bigquery.model.TableSchema;
>>>
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>>
>>> public class StreamDemoConsumer {
>>>
>>> private static final Logger LOG =
>>> LoggerFactory.getLogger(StreamDemoConsumer.class);
>>>
>>> public static interface MyOptions extends DataflowPipelineOptions {
>>> @Description("Output BigQuery table
>>> <project_id>:<dataset_id>.<table_id>")
>>> @Default.String("<project-id>:demos.streamdemo")
>>> String getOutput();
>>>
>>> void setOutput(String s);
>>>
>>> @Description("Input topic")
>>> @Default.String("projects/<project-id>/topics/streamdemo")
>>> String getInput();
>>>
>>> void setInput(String s);
>>> }
>>>
>>> @SuppressWarnings("serial")
>>> public static void main(String[] args) {
>>> MyOptions options =
>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
>>> options.setStreaming(true);
>>> Pipeline p = Pipeline.create(options);
>>>
>>> String topic = options.getInput();
>>> String output = options.getOutput();
>>>
>>> // Build the table schema for the output table.
>>> List<TableFieldSchema> fields = new ArrayList<>();
>>> fields.add(new
>>> TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
>>> fields.add(new
>>> TableFieldSchema().setName("num_words").setType("INTEGER"));
>>> TableSchema schema = new TableSchema().setFields(fields);
>>>
>>> p //
>>> .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
>>> .apply("window",
>>> Window.into(SlidingWindows//
>>> .of(Duration.standardMinutes(2))//
>>> .every(Duration.standardSeconds(30)))) //
>>> .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() {
>>> @ProcessElement
>>> public void processElement(ProcessContext c) throws Exception {
>>> String line = c.element();
>>> LOG.info(line);
>>> c.output(line.split(" ").length);
>>> }
>>> }))//
>>> .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults()) //
>>> .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() {
>>> @ProcessElement
>>> public void processElement(ProcessContext c) throws Exception {
>>> LOG.info(c.element().toString());
>>>
>>> TableRow row = new TableRow();
>>> row.set("timestamp", Instant.now().toString());
>>> row.set("num_words", c.element());
>>> c.output(row);
>>> }
>>> })) //
>>> .apply(BigQueryIO.writeTableRows().to(output)//
>>> .withSchema(schema)//
>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>
>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
>>>
>>> p.run();
>>> }
>>> }
>>>
>>
>
> --
> Abraços,
> Daniel Amadei (http://www.amadei.com.br)
>

Reply via email to