that's the case. I'll try with more volume. Thanks

Em sex, 20 de set de 2019 às 23:16, Reza Rokni <[email protected]> escreveu:

> Hi,
>
> Are you are using a very small volume of PubSub messages for testing
> purposes, for example manually sending just one message to see it work its
> way through?
>
> Don't have the Jira number to hand but if I recall the the PubSubIO
> connector for the directrunner had a issue around moving the watermark in
> testing scenarios, so very low qps.
>
> If that is the case here, could you try with some more volume on the
> PubSub topic?
>
> Cheers
> Reza
>
>
>
>
> On Sat, 21 Sep 2019, 04:13 Ankur Goenka, <[email protected]> wrote:
>
>> It seems that all the worker threads are blocked on reading data from
>> PubSub.
>>
>> Can you try pulling a few pub sub messages from your command line using
>> gcloud beta pubsub subscriptions pull <your subscription> --limit=1
>>
>> Example thread stack
>>
>> "direct-runner-worker" #19 prio=5 os_prio=31 cpu=5955.37ms
>> elapsed=1441.13s tid=0x00007fa12204d000 nid=0xa30f runnable
>>  [0x0000700002a55000]
>>    java.lang.Thread.State: RUNNABLE
>> at java.net.SocketInputStream.socketRead0([email protected]/Native Method)
>> at java.net.SocketInputStream.socketRead([email protected]
>> /SocketInputStream.java:115)
>> at java.net.SocketInputStream.read([email protected]
>> /SocketInputStream.java:168)
>> at java.net.SocketInputStream.read([email protected]
>> /SocketInputStream.java:140)
>> at sun.security.ssl.SSLSocketInputRecord.read([email protected]
>> /SSLSocketInputRecord.java:448)
>> at
>> sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket([email protected]
>> /SSLSocketInputRecord.java:68)
>> at sun.security.ssl.SSLSocketImpl.readApplicationRecord([email protected]
>> /SSLSocketImpl.java:1104)
>> at sun.security.ssl.SSLSocketImpl$AppInputStream.read([email protected]
>> /SSLSocketImpl.java:823)
>> - locked <0x0000000625ce3f28> (a
>> sun.security.ssl.SSLSocketImpl$AppInputStream)
>> at java.io.BufferedInputStream.fill([email protected]
>> /BufferedInputStream.java:252)
>> at java.io.BufferedInputStream.read1([email protected]
>> /BufferedInputStream.java:292)
>> at java.io.BufferedInputStream.read([email protected]
>> /BufferedInputStream.java:351)
>> - locked <0x0000000624b3ae28> (a java.io.BufferedInputStream)
>> at sun.net.www.http.HttpClient.parseHTTPHeader([email protected]
>> /HttpClient.java:746)
>> at sun.net.www.http.HttpClient.parseHTTP([email protected]
>> /HttpClient.java:689)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.getInputStream0([email protected]
>> /HttpURLConnection.java:1604)
>> - locked <0x0000000625cdc8a8> (a
>> sun.net.www.protocol.https.DelegateHttpsURLConnection)
>> at
>> sun.net.www.protocol.http.HttpURLConnection.getInputStream([email protected]
>> /HttpURLConnection.java:1509)
>> - locked <0x0000000625cdc8a8> (a
>> sun.net.www.protocol.https.DelegateHttpsURLConnection)
>> at java.net.HttpURLConnection.getResponseCode([email protected]
>> /HttpURLConnection.java:527)
>> at
>> sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode([email protected]
>> /HttpsURLConnectionImpl.java:329)
>> at
>> com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
>> at
>> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:143)
>> at
>> com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:84)
>> at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1011)
>> at
>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:499)
>> at
>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432)
>> at
>> com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:549)
>> at
>> org.apache.beam.sdk.io.gcp.pubsub.PubsubJsonClient.pull(PubsubJsonClient.java:162)
>> at
>> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.pull(PubsubUnboundedSource.java:719)
>> at
>> org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubReader.advance(PubsubUnboundedSource.java:858)
>> at
>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:238)
>> at
>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:133)
>> at
>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160)
>> at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124)
>> at java.util.concurrent.Executors$RunnableAdapter.call([email protected]
>> /Executors.java:515)
>> at java.util.concurrent.FutureTask.run([email protected]
>> /FutureTask.java:264)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker([email protected]
>> /ThreadPoolExecutor.java:1128)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run([email protected]
>> /ThreadPoolExecutor.java:628)
>> at java.lang.Thread.run([email protected]/Thread.java:834)
>>
>> On Fri, Sep 20, 2019 at 11:44 AM Daniel Amadei <[email protected]>
>> wrote:
>>
>>> Hi Ankur,
>>>
>>> I tried to do that but nothing spot to my eyes as the culprit. Stack
>>> trace attached in case you want to give a look.
>>>
>>> Thanks
>>> Daniel
>>>
>>> Em sex, 20 de set de 2019 às 14:44, Ankur Goenka <[email protected]>
>>> escreveu:
>>>
>>>> On the surface, pipeline looks fine.
>>>> As this is a java pipeline running locally, I would recommend checking
>>>> the thread stack using jstack and see where is the process stuck.
>>>>
>>>>
>>>>
>>>> On Fri, Sep 20, 2019 at 10:39 AM Daniel Amadei <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm trying to run the following code using direct runner, consuming
>>>>> from a pub sub topic and sending aggregate data to bigquery. When ran
>>>>> locally using DirectRunner, the code gets stuck after the window.
>>>>>
>>>>> If ran using DataflowRunner, it runs successfully.
>>>>>
>>>>> Any ideas?
>>>>>
>>>>> Thanks
>>>>> Daniel Amadei
>>>>>
>>>>>
>>>>> import java.time.Instant;
>>>>> import java.util.ArrayList;
>>>>> import java.util.List;
>>>>>
>>>>> import
>>>>> org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
>>>>> import org.apache.beam.sdk.Pipeline;
>>>>> import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
>>>>> import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
>>>>> import org.apache.beam.sdk.options.Default;
>>>>> import org.apache.beam.sdk.options.Description;
>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory;
>>>>> import org.apache.beam.sdk.transforms.DoFn;
>>>>> import org.apache.beam.sdk.transforms.ParDo;
>>>>> import org.apache.beam.sdk.transforms.Sum;
>>>>> import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
>>>>> import org.apache.beam.sdk.transforms.windowing.Window;
>>>>> import org.joda.time.Duration;
>>>>>
>>>>> import com.google.api.services.bigquery.model.TableFieldSchema;
>>>>> import com.google.api.services.bigquery.model.TableRow;
>>>>> import com.google.api.services.bigquery.model.TableSchema;
>>>>>
>>>>> import org.slf4j.Logger;
>>>>> import org.slf4j.LoggerFactory;
>>>>>
>>>>>
>>>>> public class StreamDemoConsumer {
>>>>>
>>>>> private static final Logger LOG =
>>>>> LoggerFactory.getLogger(StreamDemoConsumer.class);
>>>>>
>>>>> public static interface MyOptions extends DataflowPipelineOptions {
>>>>> @Description("Output BigQuery table
>>>>> <project_id>:<dataset_id>.<table_id>")
>>>>> @Default.String("<project-id>:demos.streamdemo")
>>>>> String getOutput();
>>>>>
>>>>> void setOutput(String s);
>>>>>
>>>>> @Description("Input topic")
>>>>> @Default.String("projects/<project-id>/topics/streamdemo")
>>>>> String getInput();
>>>>>
>>>>> void setInput(String s);
>>>>> }
>>>>>
>>>>> @SuppressWarnings("serial")
>>>>> public static void main(String[] args) {
>>>>> MyOptions options =
>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
>>>>> options.setStreaming(true);
>>>>> Pipeline p = Pipeline.create(options);
>>>>>
>>>>> String topic = options.getInput();
>>>>> String output = options.getOutput();
>>>>>
>>>>> // Build the table schema for the output table.
>>>>> List<TableFieldSchema> fields = new ArrayList<>();
>>>>> fields.add(new
>>>>> TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
>>>>> fields.add(new
>>>>> TableFieldSchema().setName("num_words").setType("INTEGER"));
>>>>> TableSchema schema = new TableSchema().setFields(fields);
>>>>>
>>>>> p //
>>>>> .apply("GetMessages", PubsubIO.readStrings().fromTopic(topic)) //
>>>>> .apply("window",
>>>>> Window.into(SlidingWindows//
>>>>> .of(Duration.standardMinutes(2))//
>>>>> .every(Duration.standardSeconds(30)))) //
>>>>> .apply("WordsPerLine", ParDo.of(new DoFn<String, Integer>() {
>>>>> @ProcessElement
>>>>> public void processElement(ProcessContext c) throws Exception {
>>>>> String line = c.element();
>>>>> LOG.info(line);
>>>>> c.output(line.split(" ").length);
>>>>> }
>>>>> }))//
>>>>> .apply("WordsInTimeWindow", Sum.integersGlobally().withoutDefaults())
>>>>> //
>>>>> .apply("ToBQRow", ParDo.of(new DoFn<Integer, TableRow>() {
>>>>> @ProcessElement
>>>>> public void processElement(ProcessContext c) throws Exception {
>>>>> LOG.info(c.element().toString());
>>>>>
>>>>> TableRow row = new TableRow();
>>>>> row.set("timestamp", Instant.now().toString());
>>>>> row.set("num_words", c.element());
>>>>> c.output(row);
>>>>> }
>>>>> })) //
>>>>> .apply(BigQueryIO.writeTableRows().to(output)//
>>>>> .withSchema(schema)//
>>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
>>>>>
>>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
>>>>>
>>>>> p.run();
>>>>> }
>>>>> }
>>>>>
>>>>
>>>
>>> --
>>> Abraços,
>>> Daniel Amadei (http://www.amadei.com.br)
>>>
>>

-- 
Abraços,
Daniel Amadei (http://www.amadei.com.br)

Reply via email to