Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Dongwon Kim
Thanks Reuven for the input and Wang for CC'ing to Reuven.

Generally you should not rely on PCollection being ordered

Is it because Beam splits PCollection into multiple input splits and tries
to process it as efficiently as possible without considering times?
This one is very confusing as I've been using Flink for a long time; AFAIK,
Flink DataStream API guarantees ordering for the same key between two
different tasks.

Best,

Dongwon

On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax  wrote:

> Generally you should not rely on PCollection being ordered, though there
> have been discussions about adding some time-ordering semantics.
>
>
>
> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>
>> Current Beam model does not guarantee an ordering after a GBK (i.e.
>> Combine.perKey() in your). So you cannot expect that the C step sees
>> elements in a specific order.
>>
>> As I recall on Dataflow runner, there is very limited ordering support.
>> Hi +Reuven Lax  can share your insights about it?
>>
>>
>> -Rui
>>
>>
>>
>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim 
>> wrote:
>>
>>> Hi,
>>>
>>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>>> It roughly looks like below:
>>> p.apply(KafkaIO.read() ...)   // (A-1)
>>>   .apply(WithKeys.of(...).withKeyType(...))
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>   .apply(Combine.perKey(...))  // (B)
>>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats
>>> in (C)
>>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>>> Note that (C) has its own state which is expected to be fetched and
>>> updated by window results (B) in order of event-time.
>>>
>>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>>
 p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>>
>>> "text.txt" contains samples having a single key.
>>>
>>> I get a wrong result and it turns out that window results didn't feed
>>> into (C) in order.
>>> Is it because (A-2) makes the pipeline a bounded one?
>>>
>>> Q1. How to prevent this from happening?
>>> Q2. How do you guys usually write an integration test for an unbounded
>>> one with stateful function?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>


Re: Need Support for ElasticSearch 7.x for beam

2020-08-24 Thread Mohil Khare
hello Kyle,

Thanks a lot for your prompt reply. Hmm.. strange, I think I was getting
some error that version was not supported. Not sure if I updated my beam
version last time when I tried with ES 7.x.
Let me try it again and let you know.

Thanks and Regards
Mohil

On Mon, Aug 24, 2020 at 11:54 AM Kyle Weaver  wrote:

> This ticket indicates Elasticsearch 7.x has been supported since Beam
> 2.19: https://issues.apache.org/jira/browse/BEAM-5192
>
> Are there any specific features you need that aren't supported?
>
> On Mon, Aug 24, 2020 at 11:33 AM Mohil Khare  wrote:
>
>> Hello,
>>
>> Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one
>> of our sinks.
>>
>> It's been a while since beam got upgraded to support elasticsearch
>> version greater than 6.x.
>> Elasticsearch has now moved on to 7.x and we want to use some of their
>> new security features.
>>
>> I want to know which version of beam will support elasticsearch version
>> 7.x.
>> I see the following commit, but it looks like it hasn't been merged with
>> master yet.
>>
>> https://github.com/apache/beam/pull/10023
>>
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>>


Re: How to integrate Beam SQL windowing query with KafkaIO?

2020-08-24 Thread Rui Wang
Hi,

I checked the query in your SO question and I think the SQL usage is
correct.

My current guess is that the problem is how does watermark generate and
advance in KafkaIO. It could be either the watermark didn't pass the end of
your SQL window for aggregation or the data was lagging behind the
watermark so they are considered late data.

One way to verify it is you can try to use TestStream as the source to
evaluate your pipeline and see whether it works well.

-Rui

On Mon, Aug 24, 2020 at 11:06 AM Minreng Wu  wrote:

> Hi contributors,
>
> Sorry to bother you! I met a problem when I was trying to apply a
> windowing aggregation Beam SQL query to a Kafka input source.
>
> The details of the question are in the following link:
> https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio.
> And the version of the Beam Java SDK I used is *2.23.0*
>
> Really appreciate your help and advice! Stay safe and happy!
>
> Thanks and regards,
> Minreng
>


Re: Need Support for ElasticSearch 7.x for beam

2020-08-24 Thread Kyle Weaver
This ticket indicates Elasticsearch 7.x has been supported since Beam 2.19:
https://issues.apache.org/jira/browse/BEAM-5192

Are there any specific features you need that aren't supported?

On Mon, Aug 24, 2020 at 11:33 AM Mohil Khare  wrote:

> Hello,
>
> Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one of
> our sinks.
>
> It's been a while since beam got upgraded to support elasticsearch version
> greater than 6.x.
> Elasticsearch has now moved on to 7.x and we want to use some of their new
> security features.
>
> I want to know which version of beam will support elasticsearch version
> 7.x.
> I see the following commit, but it looks like it hasn't been merged with
> master yet.
>
> https://github.com/apache/beam/pull/10023
>
>
> Thanks and regards
> Mohil
>
>
>
>


Need Support for ElasticSearch 7.x for beam

2020-08-24 Thread Mohil Khare
Hello,

Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one of
our sinks.

It's been a while since beam got upgraded to support elasticsearch version
greater than 6.x.
Elasticsearch has now moved on to 7.x and we want to use some of their new
security features.

I want to know which version of beam will support elasticsearch version 7.x.
I see the following commit, but it looks like it hasn't been merged with
master yet.

https://github.com/apache/beam/pull/10023


Thanks and regards
Mohil


How to integrate Beam SQL windowing query with KafkaIO?

2020-08-24 Thread Minreng Wu
Hi contributors,

Sorry to bother you! I met a problem when I was trying to apply a windowing
aggregation Beam SQL query to a Kafka input source.

The details of the question are in the following link:
https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio.
And the version of the Beam Java SDK I used is *2.23.0*

Really appreciate your help and advice! Stay safe and happy!

Thanks and regards,
Minreng


Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Robert Bradshaw
As for the question of writing tests in the face of non-determinism,
you should look into TestStream. MyStatefulDoFn still needs to be
updated to not assume an ordering. (This can be done by setting timers
that provide guarantees that (modulo late data) one has seen all data
up to a certain timestamp.)

On Mon, Aug 24, 2020 at 8:56 AM Reuven Lax  wrote:
>
> Generally you should not rely on PCollection being ordered, though there have 
> been discussions about adding some time-ordering semantics.
>
>
>
> On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:
>>
>> Current Beam model does not guarantee an ordering after a GBK (i.e. 
>> Combine.perKey() in your). So you cannot expect that the C step sees 
>> elements in a specific order.
>>
>> As I recall on Dataflow runner, there is very limited ordering support. Hi 
>> +Reuven Lax can share your insights about it?
>>
>>
>> -Rui
>>
>>
>>
>> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:
>>>
>>> Hi,
>>>
>>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>>> It roughly looks like below:
>>> p.apply(KafkaIO.read() ...)   // (A-1)
>>>   .apply(WithKeys.of(...).withKeyType(...))
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>   .apply(Combine.perKey(...))  // (B)
>>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats in 
>>> (C)
>>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>>> Note that (C) has its own state which is expected to be fetched and updated 
>>> by window results (B) in order of event-time.
>>>
>>> Now I'm writing an integration test where (A-1) is replaced by (A-2):

 p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>>
>>> "text.txt" contains samples having a single key.
>>>
>>> I get a wrong result and it turns out that window results didn't feed into 
>>> (C) in order.
>>> Is it because (A-2) makes the pipeline a bounded one?
>>>
>>> Q1. How to prevent this from happening?
>>> Q2. How do you guys usually write an integration test for an unbounded one 
>>> with stateful function?
>>>
>>> Best,
>>>
>>> Dongwon


Re: tensorflow-data-validation(with direct runner) failed to process large data because of grpc timeout on workers

2020-08-24 Thread Luke Cwik
Another person reported something similar for Dataflow and it seemed as
though in their scenario they were using locks and either got into a
deadlock or starved processing for long enough that the watchdog also
failed. Are you using locks and/or having really long single element
processing times?

On Mon, Aug 24, 2020 at 1:50 AM Junjian Xu  wrote:

> Hi,
>
> I’m running into a problem of tensorflow-data-validation with direct
> runner to generate statistics from some large datasets over 400GB.
>
> It seems that all workers stopped working after an error message of
> “Keepalive watchdog fired. Closing transport.” It seems to be a grpc
> keepalive timeout.
>
> ```
> E0804 17:49:07.419950276   44806 chttp2_transport.cc:2881]
> ipv6:[::1]:40823: Keepalive watchdog fired. Closing transport.
> 2020-08-04 17:49:07  local_job_service.py : INFO  Worker: severity: ERROR
> timestamp {   seconds: 1596563347   nanos: 420487403 } message: "Python sdk
> harness failed: \nTraceback (most recent call last):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\",
> line 158, in main\n
>  sdk_pipeline_options.view_as(ProfilingOptions))).run()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\",
> line 213, in run\nfor work_request in
> self._control_stub.Control(get_responses()):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 416, in __next__\nreturn self._next()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous:
> <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus =
> StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog
> timeout\"\n\tdebug_error_string =
> \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received
> from peer
> ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive
> watchdog timeout\",\"grpc_status\":14}\"\n>" trace: "Traceback (most recent
> call last):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\",
> line 158, in main\n
>  sdk_pipeline_options.view_as(ProfilingOptions))).run()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\",
> line 213, in run\nfor work_request in
> self._control_stub.Control(get_responses()):\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 416, in __next__\nreturn self._next()\n  File
> \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
> 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous:
> <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus =
> StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog
> timeout\"\n\tdebug_error_string =
> \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received
> from peer
> ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive
> watchdog timeout\",\"grpc_status\":14}\"\n>\n" log_location:
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:161"
> thread: "MainThread"
> Traceback (most recent call last):
>   File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
> "__main__", mod_spec)
>   File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
> exec(code, run_globalse
>   File
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 248, in 
> main(sys.argv)
>   File
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 158, in main
> sdk_pipeline_options.view_as(ProfilingOptions))).run()
>   File
> "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 213, in run
> for work_request in self._control_stub.Control(get_responses()):
>   File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py",
> line 416, in __next__
> return self._next()
>   File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py",
> line 706, in _next
> raise self
> grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
> that terminated with:
> status = StatusCode.UNAVAILABLE
> details = "keepalive watchdog timeout"
> debug_error_string =
> "{"created":"@1596563347.420024732","description":"Error received from peer
> ipv6:[::1]:40823","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"keepalive
> watchdog timeout","grpc_status":14}"
> ```
>
> I originally raised the issue in tensorflow-data-validation community but
> we couldn't come up with any solution.
> https://github.com/tensorflow/data-validation/issues/133
>
> The beam version is 2.22.0. Please let me know if I 

Re: Out-of-orderness of window results when testing stateful operators with TextIO

2020-08-24 Thread Reuven Lax
Generally you should not rely on PCollection being ordered, though there
have been discussions about adding some time-ordering semantics.



On Sun, Aug 23, 2020 at 9:06 PM Rui Wang  wrote:

> Current Beam model does not guarantee an ordering after a GBK (i.e.
> Combine.perKey() in your). So you cannot expect that the C step sees
> elements in a specific order.
>
> As I recall on Dataflow runner, there is very limited ordering support. Hi 
> +Reuven
> Lax  can share your insights about it?
>
>
> -Rui
>
>
>
> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim  wrote:
>
>> Hi,
>>
>> My Beam pipeline is designed to work with an unbounded source KafkaIO.
>> It roughly looks like below:
>> p.apply(KafkaIO.read() ...)   // (A-1)
>>   .apply(WithKeys.of(...).withKeyType(...))
>>   .apply(Window.into(FixedWindows.of(...)))
>>   .apply(Combine.perKey(...))  // (B)
>>   .apply(Window.into(new GlobalWindows())) // to have per-key stats
>> in (C)
>>   .apply(ParDo.of(new MyStatefulDoFn()))  // (C)
>> Note that (C) has its own state which is expected to be fetched and
>> updated by window results (B) in order of event-time.
>>
>> Now I'm writing an integration test where (A-1) is replaced by (A-2):
>>
>>> p.apply(TextIO.read().from("test.txt"))  // (A-2)
>>
>> "text.txt" contains samples having a single key.
>>
>> I get a wrong result and it turns out that window results didn't feed
>> into (C) in order.
>> Is it because (A-2) makes the pipeline a bounded one?
>>
>> Q1. How to prevent this from happening?
>> Q2. How do you guys usually write an integration test for an unbounded
>> one with stateful function?
>>
>> Best,
>>
>> Dongwon
>>
>


tensorflow-data-validation(with direct runner) failed to process large data because of grpc timeout on workers

2020-08-24 Thread Junjian Xu
Hi,

I’m running into a problem of tensorflow-data-validation with direct runner
to generate statistics from some large datasets over 400GB.

It seems that all workers stopped working after an error message of
“Keepalive watchdog fired. Closing transport.” It seems to be a grpc
keepalive timeout.

```
E0804 17:49:07.419950276   44806 chttp2_transport.cc:2881]
ipv6:[::1]:40823: Keepalive watchdog fired. Closing transport.
2020-08-04 17:49:07  local_job_service.py : INFO  Worker: severity: ERROR
timestamp {   seconds: 1596563347   nanos: 420487403 } message: "Python sdk
harness failed: \nTraceback (most recent call last):\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\",
line 158, in main\n
 sdk_pipeline_options.view_as(ProfilingOptions))).run()\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\",
line 213, in run\nfor work_request in
self._control_stub.Control(get_responses()):\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
416, in __next__\nreturn self._next()\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous:
<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus =
StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog
timeout\"\n\tdebug_error_string =
\"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received
from peer
ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive
watchdog timeout\",\"grpc_status\":14}\"\n>" trace: "Traceback (most recent
call last):\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\",
line 158, in main\n
 sdk_pipeline_options.view_as(ProfilingOptions))).run()\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\",
line 213, in run\nfor work_request in
self._control_stub.Control(get_responses()):\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
416, in __next__\nreturn self._next()\n  File
\"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line
706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous:
<_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus =
StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog
timeout\"\n\tdebug_error_string =
\"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received
from peer
ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive
watchdog timeout\",\"grpc_status\":14}\"\n>\n" log_location:
"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:161"
thread: "MainThread"
Traceback (most recent call last):
  File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main
"__main__", mod_spec)
  File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code
exec(code, run_globalse
  File
"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
line 248, in 
main(sys.argv)
  File
"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
line 158, in main
sdk_pipeline_options.view_as(ProfilingOptions))).run()
  File
"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 213, in run
for work_request in self._control_stub.Control(get_responses()):
  File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py",
line 416, in __next__
return self._next()
  File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py",
line 706, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC
that terminated with:
status = StatusCode.UNAVAILABLE
details = "keepalive watchdog timeout"
debug_error_string =
"{"created":"@1596563347.420024732","description":"Error received from peer
ipv6:[::1]:40823","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"keepalive
watchdog timeout","grpc_status":14}"
```

I originally raised the issue in tensorflow-data-validation community but
we couldn't come up with any solution.
https://github.com/tensorflow/data-validation/issues/133

The beam version is 2.22.0. Please let me know if I missed anything.

Thanks,
Junjian