Re: Out-of-orderness of window results when testing stateful operators with TextIO
Thanks Reuven for the input and Wang for CC'ing to Reuven. Generally you should not rely on PCollection being ordered Is it because Beam splits PCollection into multiple input splits and tries to process it as efficiently as possible without considering times? This one is very confusing as I've been using Flink for a long time; AFAIK, Flink DataStream API guarantees ordering for the same key between two different tasks. Best, Dongwon On Tue, Aug 25, 2020 at 12:56 AM Reuven Lax wrote: > Generally you should not rely on PCollection being ordered, though there > have been discussions about adding some time-ordering semantics. > > > > On Sun, Aug 23, 2020 at 9:06 PM Rui Wang wrote: > >> Current Beam model does not guarantee an ordering after a GBK (i.e. >> Combine.perKey() in your). So you cannot expect that the C step sees >> elements in a specific order. >> >> As I recall on Dataflow runner, there is very limited ordering support. >> Hi +Reuven Lax can share your insights about it? >> >> >> -Rui >> >> >> >> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim >> wrote: >> >>> Hi, >>> >>> My Beam pipeline is designed to work with an unbounded source KafkaIO. >>> It roughly looks like below: >>> p.apply(KafkaIO.read() ...) // (A-1) >>> .apply(WithKeys.of(...).withKeyType(...)) >>> .apply(Window.into(FixedWindows.of(...))) >>> .apply(Combine.perKey(...)) // (B) >>> .apply(Window.into(new GlobalWindows())) // to have per-key stats >>> in (C) >>> .apply(ParDo.of(new MyStatefulDoFn())) // (C) >>> Note that (C) has its own state which is expected to be fetched and >>> updated by window results (B) in order of event-time. >>> >>> Now I'm writing an integration test where (A-1) is replaced by (A-2): >>> p.apply(TextIO.read().from("test.txt")) // (A-2) >>> >>> "text.txt" contains samples having a single key. >>> >>> I get a wrong result and it turns out that window results didn't feed >>> into (C) in order. >>> Is it because (A-2) makes the pipeline a bounded one? >>> >>> Q1. How to prevent this from happening? >>> Q2. How do you guys usually write an integration test for an unbounded >>> one with stateful function? >>> >>> Best, >>> >>> Dongwon >>> >>
Re: Need Support for ElasticSearch 7.x for beam
hello Kyle, Thanks a lot for your prompt reply. Hmm.. strange, I think I was getting some error that version was not supported. Not sure if I updated my beam version last time when I tried with ES 7.x. Let me try it again and let you know. Thanks and Regards Mohil On Mon, Aug 24, 2020 at 11:54 AM Kyle Weaver wrote: > This ticket indicates Elasticsearch 7.x has been supported since Beam > 2.19: https://issues.apache.org/jira/browse/BEAM-5192 > > Are there any specific features you need that aren't supported? > > On Mon, Aug 24, 2020 at 11:33 AM Mohil Khare wrote: > >> Hello, >> >> Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one >> of our sinks. >> >> It's been a while since beam got upgraded to support elasticsearch >> version greater than 6.x. >> Elasticsearch has now moved on to 7.x and we want to use some of their >> new security features. >> >> I want to know which version of beam will support elasticsearch version >> 7.x. >> I see the following commit, but it looks like it hasn't been merged with >> master yet. >> >> https://github.com/apache/beam/pull/10023 >> >> >> Thanks and regards >> Mohil >> >> >> >>
Re: How to integrate Beam SQL windowing query with KafkaIO?
Hi, I checked the query in your SO question and I think the SQL usage is correct. My current guess is that the problem is how does watermark generate and advance in KafkaIO. It could be either the watermark didn't pass the end of your SQL window for aggregation or the data was lagging behind the watermark so they are considered late data. One way to verify it is you can try to use TestStream as the source to evaluate your pipeline and see whether it works well. -Rui On Mon, Aug 24, 2020 at 11:06 AM Minreng Wu wrote: > Hi contributors, > > Sorry to bother you! I met a problem when I was trying to apply a > windowing aggregation Beam SQL query to a Kafka input source. > > The details of the question are in the following link: > https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio. > And the version of the Beam Java SDK I used is *2.23.0* > > Really appreciate your help and advice! Stay safe and happy! > > Thanks and regards, > Minreng >
Re: Need Support for ElasticSearch 7.x for beam
This ticket indicates Elasticsearch 7.x has been supported since Beam 2.19: https://issues.apache.org/jira/browse/BEAM-5192 Are there any specific features you need that aren't supported? On Mon, Aug 24, 2020 at 11:33 AM Mohil Khare wrote: > Hello, > > Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one of > our sinks. > > It's been a while since beam got upgraded to support elasticsearch version > greater than 6.x. > Elasticsearch has now moved on to 7.x and we want to use some of their new > security features. > > I want to know which version of beam will support elasticsearch version > 7.x. > I see the following commit, but it looks like it hasn't been merged with > master yet. > > https://github.com/apache/beam/pull/10023 > > > Thanks and regards > Mohil > > > >
Need Support for ElasticSearch 7.x for beam
Hello, Firstly I am on java sdk 2.23.0 and we heavily use Elasticsearch as one of our sinks. It's been a while since beam got upgraded to support elasticsearch version greater than 6.x. Elasticsearch has now moved on to 7.x and we want to use some of their new security features. I want to know which version of beam will support elasticsearch version 7.x. I see the following commit, but it looks like it hasn't been merged with master yet. https://github.com/apache/beam/pull/10023 Thanks and regards Mohil
How to integrate Beam SQL windowing query with KafkaIO?
Hi contributors, Sorry to bother you! I met a problem when I was trying to apply a windowing aggregation Beam SQL query to a Kafka input source. The details of the question are in the following link: https://stackoverflow.com/questions/63566057/how-to-integrate-beam-sql-windowing-query-with-kafkaio. And the version of the Beam Java SDK I used is *2.23.0* Really appreciate your help and advice! Stay safe and happy! Thanks and regards, Minreng
Re: Out-of-orderness of window results when testing stateful operators with TextIO
As for the question of writing tests in the face of non-determinism, you should look into TestStream. MyStatefulDoFn still needs to be updated to not assume an ordering. (This can be done by setting timers that provide guarantees that (modulo late data) one has seen all data up to a certain timestamp.) On Mon, Aug 24, 2020 at 8:56 AM Reuven Lax wrote: > > Generally you should not rely on PCollection being ordered, though there have > been discussions about adding some time-ordering semantics. > > > > On Sun, Aug 23, 2020 at 9:06 PM Rui Wang wrote: >> >> Current Beam model does not guarantee an ordering after a GBK (i.e. >> Combine.perKey() in your). So you cannot expect that the C step sees >> elements in a specific order. >> >> As I recall on Dataflow runner, there is very limited ordering support. Hi >> +Reuven Lax can share your insights about it? >> >> >> -Rui >> >> >> >> On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim wrote: >>> >>> Hi, >>> >>> My Beam pipeline is designed to work with an unbounded source KafkaIO. >>> It roughly looks like below: >>> p.apply(KafkaIO.read() ...) // (A-1) >>> .apply(WithKeys.of(...).withKeyType(...)) >>> .apply(Window.into(FixedWindows.of(...))) >>> .apply(Combine.perKey(...)) // (B) >>> .apply(Window.into(new GlobalWindows())) // to have per-key stats in >>> (C) >>> .apply(ParDo.of(new MyStatefulDoFn())) // (C) >>> Note that (C) has its own state which is expected to be fetched and updated >>> by window results (B) in order of event-time. >>> >>> Now I'm writing an integration test where (A-1) is replaced by (A-2): p.apply(TextIO.read().from("test.txt")) // (A-2) >>> >>> "text.txt" contains samples having a single key. >>> >>> I get a wrong result and it turns out that window results didn't feed into >>> (C) in order. >>> Is it because (A-2) makes the pipeline a bounded one? >>> >>> Q1. How to prevent this from happening? >>> Q2. How do you guys usually write an integration test for an unbounded one >>> with stateful function? >>> >>> Best, >>> >>> Dongwon
Re: tensorflow-data-validation(with direct runner) failed to process large data because of grpc timeout on workers
Another person reported something similar for Dataflow and it seemed as though in their scenario they were using locks and either got into a deadlock or starved processing for long enough that the watchdog also failed. Are you using locks and/or having really long single element processing times? On Mon, Aug 24, 2020 at 1:50 AM Junjian Xu wrote: > Hi, > > I’m running into a problem of tensorflow-data-validation with direct > runner to generate statistics from some large datasets over 400GB. > > It seems that all workers stopped working after an error message of > “Keepalive watchdog fired. Closing transport.” It seems to be a grpc > keepalive timeout. > > ``` > E0804 17:49:07.419950276 44806 chttp2_transport.cc:2881] > ipv6:[::1]:40823: Keepalive watchdog fired. Closing transport. > 2020-08-04 17:49:07 local_job_service.py : INFO Worker: severity: ERROR > timestamp { seconds: 1596563347 nanos: 420487403 } message: "Python sdk > harness failed: \nTraceback (most recent call last):\n File > \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\", > line 158, in main\n > sdk_pipeline_options.view_as(ProfilingOptions))).run()\n File > \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\", > line 213, in run\nfor work_request in > self._control_stub.Control(get_responses()):\n File > \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line > 416, in __next__\nreturn self._next()\n File > \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line > 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous: > <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = > StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog > timeout\"\n\tdebug_error_string = > \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received > from peer > ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive > watchdog timeout\",\"grpc_status\":14}\"\n>" trace: "Traceback (most recent > call last):\n File > \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\", > line 158, in main\n > sdk_pipeline_options.view_as(ProfilingOptions))).run()\n File > \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\", > line 213, in run\nfor work_request in > self._control_stub.Control(get_responses()):\n File > \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line > 416, in __next__\nreturn self._next()\n File > \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line > 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous: > <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = > StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog > timeout\"\n\tdebug_error_string = > \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received > from peer > ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive > watchdog timeout\",\"grpc_status\":14}\"\n>\n" log_location: > "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:161" > thread: "MainThread" > Traceback (most recent call last): > File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main > "__main__", mod_spec) > File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code > exec(code, run_globalse > File > "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", > line 248, in > main(sys.argv) > File > "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", > line 158, in main > sdk_pipeline_options.view_as(ProfilingOptions))).run() > File > "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 213, in run > for work_request in self._control_stub.Control(get_responses()): > File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py", > line 416, in __next__ > return self._next() > File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py", > line 706, in _next > raise self > grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC > that terminated with: > status = StatusCode.UNAVAILABLE > details = "keepalive watchdog timeout" > debug_error_string = > "{"created":"@1596563347.420024732","description":"Error received from peer > ipv6:[::1]:40823","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"keepalive > watchdog timeout","grpc_status":14}" > ``` > > I originally raised the issue in tensorflow-data-validation community but > we couldn't come up with any solution. > https://github.com/tensorflow/data-validation/issues/133 > > The beam version is 2.22.0. Please let me know if I
Re: Out-of-orderness of window results when testing stateful operators with TextIO
Generally you should not rely on PCollection being ordered, though there have been discussions about adding some time-ordering semantics. On Sun, Aug 23, 2020 at 9:06 PM Rui Wang wrote: > Current Beam model does not guarantee an ordering after a GBK (i.e. > Combine.perKey() in your). So you cannot expect that the C step sees > elements in a specific order. > > As I recall on Dataflow runner, there is very limited ordering support. Hi > +Reuven > Lax can share your insights about it? > > > -Rui > > > > On Sun, Aug 23, 2020 at 8:32 PM Dongwon Kim wrote: > >> Hi, >> >> My Beam pipeline is designed to work with an unbounded source KafkaIO. >> It roughly looks like below: >> p.apply(KafkaIO.read() ...) // (A-1) >> .apply(WithKeys.of(...).withKeyType(...)) >> .apply(Window.into(FixedWindows.of(...))) >> .apply(Combine.perKey(...)) // (B) >> .apply(Window.into(new GlobalWindows())) // to have per-key stats >> in (C) >> .apply(ParDo.of(new MyStatefulDoFn())) // (C) >> Note that (C) has its own state which is expected to be fetched and >> updated by window results (B) in order of event-time. >> >> Now I'm writing an integration test where (A-1) is replaced by (A-2): >> >>> p.apply(TextIO.read().from("test.txt")) // (A-2) >> >> "text.txt" contains samples having a single key. >> >> I get a wrong result and it turns out that window results didn't feed >> into (C) in order. >> Is it because (A-2) makes the pipeline a bounded one? >> >> Q1. How to prevent this from happening? >> Q2. How do you guys usually write an integration test for an unbounded >> one with stateful function? >> >> Best, >> >> Dongwon >> >
tensorflow-data-validation(with direct runner) failed to process large data because of grpc timeout on workers
Hi, I’m running into a problem of tensorflow-data-validation with direct runner to generate statistics from some large datasets over 400GB. It seems that all workers stopped working after an error message of “Keepalive watchdog fired. Closing transport.” It seems to be a grpc keepalive timeout. ``` E0804 17:49:07.419950276 44806 chttp2_transport.cc:2881] ipv6:[::1]:40823: Keepalive watchdog fired. Closing transport. 2020-08-04 17:49:07 local_job_service.py : INFO Worker: severity: ERROR timestamp { seconds: 1596563347 nanos: 420487403 } message: "Python sdk harness failed: \nTraceback (most recent call last):\n File \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\", line 158, in main\n sdk_pipeline_options.view_as(ProfilingOptions))).run()\n File \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 213, in run\nfor work_request in self._control_stub.Control(get_responses()):\n File \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line 416, in __next__\nreturn self._next()\n File \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog timeout\"\n\tdebug_error_string = \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received from peer ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive watchdog timeout\",\"grpc_status\":14}\"\n>" trace: "Traceback (most recent call last):\n File \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py\", line 158, in main\n sdk_pipeline_options.view_as(ProfilingOptions))).run()\n File \"/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py\", line 213, in run\nfor work_request in self._control_stub.Control(get_responses()):\n File \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line 416, in __next__\nreturn self._next()\n File \"/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py\", line 706, in _next\nraise self\ngrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:\n\tstatus = StatusCode.UNAVAILABLE\n\tdetails = \"keepalive watchdog timeout\"\n\tdebug_error_string = \"{\"created\":\"@1596563347.420024732\",\"description\":\"Error received from peer ipv6:[::1]:40823\",\"file\":\"src/core/lib/surface/call.cc\",\"file_line\":1055,\"grpc_message\":\"keepalive watchdog timeout\",\"grpc_status\":14}\"\n>\n" log_location: "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py:161" thread: "MainThread" Traceback (most recent call last): File "/usr/lib64/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/lib64/python3.7/runpy.py", line 85, in _run_code exec(code, run_globalse File "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 248, in main(sys.argv) File "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker_main.py", line 158, in main sdk_pipeline_options.view_as(ProfilingOptions))).run() File "/home/ec2-user/lib64/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 213, in run for work_request in self._control_stub.Control(get_responses()): File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/home/ec2-user/lib64/python3.7/site-packages/grpc/_channel.py", line 706, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "keepalive watchdog timeout" debug_error_string = "{"created":"@1596563347.420024732","description":"Error received from peer ipv6:[::1]:40823","file":"src/core/lib/surface/call.cc","file_line":1055,"grpc_message":"keepalive watchdog timeout","grpc_status":14}" ``` I originally raised the issue in tensorflow-data-validation community but we couldn't come up with any solution. https://github.com/tensorflow/data-validation/issues/133 The beam version is 2.22.0. Please let me know if I missed anything. Thanks, Junjian