[jira] [Commented] (BEAM-9118) apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky

2020-06-08 Thread Beam JIRA Bot (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17128841#comment-17128841
 ] 

Beam JIRA Bot commented on BEAM-9118:
-

This issue was marked "stale-assigned" and has not received a public comment in 
7 days. It is now automatically unassigned. If you are still working on it, you 
can assign it to yourself again. Please also give an update about the status of 
the work.

> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
>  is flaky
> 
>
> Key: BEAM-9118
> URL: https://issues.apache.org/jira/browse/BEAM-9118
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Reporter: Valentyn Tymofieiev
>Priority: P1
>  Labels: beam-fixit, flake
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12  self = 
>   testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12  
> 14:30:12  def test_pardo_unfusable_side_inputs(self):
> 14:30:12def cross_product(elem, sides):
> 14:30:12  for side in sides:
> 14:30:12yield elem, side
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(pcoll)),
> 14:30:12  equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  derived = ((pcoll,) | beam.Flatten()
> 14:30:12 | beam.Map(lambda x: (x, x))
> 14:30:12 | beam.GroupByKey()
> 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(derived)),
> 14:30:12  >   equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  apache_beam/pipeline.py:481: in __exit__
> 14:30:12  self.run().wait_until_finish()
> 14:30:12  apache_beam/runners/portability/portable_runner.py:445: in 
> wait_until_finish
> 14:30:12  for state_response in self._state_stream:
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416:
>  in __next__
> 14:30:12  return self._next()
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694:
>  in _next
> 14:30:12  _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140:
>  in wait
> 14:30:12  _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105:
>  in _wait_once
> 14:30:12  wait_fn(timeout=timeout)
> 14:30:12  /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12  gotit = waiter.acquire(True, timeout)
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  
> 14:30:12  signum = 14, frame = 
> 14:30:12  
> 14:30:12  def handler(signum, frame):
> 14:30:12msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12print('=' * 20, msg, '=' * 20)
> 14:30:12traceback.print_stack(frame)
> 14:30:12threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12for thread_id, stack in sys._current_frames().items():
> 14:30:12  th = threads_by_id.get(thread_id)
> 14:30:12  print()
> 14:30:12  print('# Thread:', th or thread_id)
> 14:30:12  traceback.print_stack(stack)
> 14:30:12  > raise BaseException(msg)
> 14:30:12  E BaseException: Timed out after 60 seconds.
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: 
> BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01  self = 
>   testMethod=test_assert_that>
> 09:06:01  
> 09:06:01  def test_assert_that(self):
> 09:06:01# TODO: figure out a way for fn_api_runner to parse and raise 
> the
> 09:06:01# underlying exception.
> 09:06:01with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01  with self.create_pipeline() as p:
> 09:06:01  > assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01  E 

[jira] [Commented] (BEAM-9118) apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky

2020-06-01 Thread Kenneth Knowles (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17121726#comment-17121726
 ] 

Kenneth Knowles commented on BEAM-9118:
---

This issue is assigned but has not received an update in 30 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
>  is flaky
> 
>
> Key: BEAM-9118
> URL: https://issues.apache.org/jira/browse/BEAM-9118
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Valentyn Tymofieiev
>Assignee: Robert Bradshaw
>Priority: P2
>  Labels: stale-assigned
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12  self = 
>   testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12  
> 14:30:12  def test_pardo_unfusable_side_inputs(self):
> 14:30:12def cross_product(elem, sides):
> 14:30:12  for side in sides:
> 14:30:12yield elem, side
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(pcoll)),
> 14:30:12  equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  derived = ((pcoll,) | beam.Flatten()
> 14:30:12 | beam.Map(lambda x: (x, x))
> 14:30:12 | beam.GroupByKey()
> 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(derived)),
> 14:30:12  >   equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  apache_beam/pipeline.py:481: in __exit__
> 14:30:12  self.run().wait_until_finish()
> 14:30:12  apache_beam/runners/portability/portable_runner.py:445: in 
> wait_until_finish
> 14:30:12  for state_response in self._state_stream:
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416:
>  in __next__
> 14:30:12  return self._next()
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694:
>  in _next
> 14:30:12  _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140:
>  in wait
> 14:30:12  _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105:
>  in _wait_once
> 14:30:12  wait_fn(timeout=timeout)
> 14:30:12  /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12  gotit = waiter.acquire(True, timeout)
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  
> 14:30:12  signum = 14, frame = 
> 14:30:12  
> 14:30:12  def handler(signum, frame):
> 14:30:12msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12print('=' * 20, msg, '=' * 20)
> 14:30:12traceback.print_stack(frame)
> 14:30:12threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12for thread_id, stack in sys._current_frames().items():
> 14:30:12  th = threads_by_id.get(thread_id)
> 14:30:12  print()
> 14:30:12  print('# Thread:', th or thread_id)
> 14:30:12  traceback.print_stack(stack)
> 14:30:12  > raise BaseException(msg)
> 14:30:12  E BaseException: Timed out after 60 seconds.
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: 
> BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01  self = 
>   testMethod=test_assert_that>
> 09:06:01  
> 09:06:01  def test_assert_that(self):
> 09:06:01# TODO: figure out a way for fn_api_runner to parse and raise 
> the
> 09:06:01# underlying exception.
> 09:06:01with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01  with 

[jira] [Commented] (BEAM-9118) apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky

2020-01-14 Thread Robert Bradshaw (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015534#comment-17015534
 ] 

Robert Bradshaw commented on BEAM-9118:
---

I'm not aware of anything that has changed here recently.

On Tue, Jan 14, 2020 at 3:44 PM Valentyn Tymofieiev (Jira)


> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
>  is flaky
> 
>
> Key: BEAM-9118
> URL: https://issues.apache.org/jira/browse/BEAM-9118
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Valentyn Tymofieiev
>Assignee: Robert Bradshaw
>Priority: Major
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12  self = 
>   testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12  
> 14:30:12  def test_pardo_unfusable_side_inputs(self):
> 14:30:12def cross_product(elem, sides):
> 14:30:12  for side in sides:
> 14:30:12yield elem, side
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(pcoll)),
> 14:30:12  equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  derived = ((pcoll,) | beam.Flatten()
> 14:30:12 | beam.Map(lambda x: (x, x))
> 14:30:12 | beam.GroupByKey()
> 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(derived)),
> 14:30:12  >   equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  apache_beam/pipeline.py:481: in __exit__
> 14:30:12  self.run().wait_until_finish()
> 14:30:12  apache_beam/runners/portability/portable_runner.py:445: in 
> wait_until_finish
> 14:30:12  for state_response in self._state_stream:
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416:
>  in __next__
> 14:30:12  return self._next()
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694:
>  in _next
> 14:30:12  _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140:
>  in wait
> 14:30:12  _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105:
>  in _wait_once
> 14:30:12  wait_fn(timeout=timeout)
> 14:30:12  /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12  gotit = waiter.acquire(True, timeout)
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  
> 14:30:12  signum = 14, frame = 
> 14:30:12  
> 14:30:12  def handler(signum, frame):
> 14:30:12msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12print('=' * 20, msg, '=' * 20)
> 14:30:12traceback.print_stack(frame)
> 14:30:12threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12for thread_id, stack in sys._current_frames().items():
> 14:30:12  th = threads_by_id.get(thread_id)
> 14:30:12  print()
> 14:30:12  print('# Thread:', th or thread_id)
> 14:30:12  traceback.print_stack(stack)
> 14:30:12  > raise BaseException(msg)
> 14:30:12  E BaseException: Timed out after 60 seconds.
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: 
> BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01  self = 
>   testMethod=test_assert_that>
> 09:06:01  
> 09:06:01  def test_assert_that(self):
> 09:06:01# TODO: figure out a way for fn_api_runner to parse and raise 
> the
> 09:06:01# underlying exception.
> 09:06:01with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01  with self.create_pipeline() as p:
> 09:06:01  > assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01  E AssertionError: "Failed assert" does not match "Pipeline 
> timed out waiting for job service subprocess."
> {noformat}



--
This 

[jira] [Commented] (BEAM-9118) apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky

2020-01-14 Thread Valentyn Tymofieiev (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015491#comment-17015491
 ] 

Valentyn Tymofieiev commented on BEAM-9118:
---

Robert, do you know if anything happened recently that could increase the 
flakiness of FnAPI/Portable runner tests?
See also: https://issues.apache.org/jira/browse/BEAM-9119

> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
>  is flaky
> 
>
> Key: BEAM-9118
> URL: https://issues.apache.org/jira/browse/BEAM-9118
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Valentyn Tymofieiev
>Assignee: Robert Bradshaw
>Priority: Major
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12  self = 
>   testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12  
> 14:30:12  def test_pardo_unfusable_side_inputs(self):
> 14:30:12def cross_product(elem, sides):
> 14:30:12  for side in sides:
> 14:30:12yield elem, side
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(pcoll)),
> 14:30:12  equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  derived = ((pcoll,) | beam.Flatten()
> 14:30:12 | beam.Map(lambda x: (x, x))
> 14:30:12 | beam.GroupByKey()
> 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(derived)),
> 14:30:12  >   equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  apache_beam/pipeline.py:481: in __exit__
> 14:30:12  self.run().wait_until_finish()
> 14:30:12  apache_beam/runners/portability/portable_runner.py:445: in 
> wait_until_finish
> 14:30:12  for state_response in self._state_stream:
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416:
>  in __next__
> 14:30:12  return self._next()
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694:
>  in _next
> 14:30:12  _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140:
>  in wait
> 14:30:12  _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105:
>  in _wait_once
> 14:30:12  wait_fn(timeout=timeout)
> 14:30:12  /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12  gotit = waiter.acquire(True, timeout)
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  
> 14:30:12  signum = 14, frame = 
> 14:30:12  
> 14:30:12  def handler(signum, frame):
> 14:30:12msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12print('=' * 20, msg, '=' * 20)
> 14:30:12traceback.print_stack(frame)
> 14:30:12threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12for thread_id, stack in sys._current_frames().items():
> 14:30:12  th = threads_by_id.get(thread_id)
> 14:30:12  print()
> 14:30:12  print('# Thread:', th or thread_id)
> 14:30:12  traceback.print_stack(stack)
> 14:30:12  > raise BaseException(msg)
> 14:30:12  E BaseException: Timed out after 60 seconds.
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: 
> BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01  self = 
>   testMethod=test_assert_that>
> 09:06:01  
> 09:06:01  def test_assert_that(self):
> 09:06:01# TODO: figure out a way for fn_api_runner to parse and raise 
> the
> 09:06:01# underlying exception.
> 09:06:01with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01  with self.create_pipeline() as p:
> 09:06:01  > assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01  E AssertionError: "Failed assert" does not match "Pipeline 
> timed 

[jira] [Commented] (BEAM-9118) apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses is flaky

2020-01-14 Thread Valentyn Tymofieiev (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-9118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17015483#comment-17015483
 ] 

Valentyn Tymofieiev commented on BEAM-9118:
---

cc: [~goenka] [~robertwb]

> apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
>  is flaky
> 
>
> Key: BEAM-9118
> URL: https://issues.apache.org/jira/browse/BEAM-9118
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-py-core
>Reporter: Valentyn Tymofieiev
>Priority: Major
>
> Sample errors:
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1373
> {noformat}
> 4:30:12  self = 
>   testMethod=test_pardo_unfusable_side_inputs>
> 14:30:12  
> 14:30:12  def test_pardo_unfusable_side_inputs(self):
> 14:30:12def cross_product(elem, sides):
> 14:30:12  for side in sides:
> 14:30:12yield elem, side
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(pcoll)),
> 14:30:12  equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12with self.create_pipeline() as p:
> 14:30:12  pcoll = p | beam.Create(['a', 'b'])
> 14:30:12  derived = ((pcoll,) | beam.Flatten()
> 14:30:12 | beam.Map(lambda x: (x, x))
> 14:30:12 | beam.GroupByKey()
> 14:30:12 | 'Unkey' >> beam.Map(lambda kv: kv[0]))
> 14:30:12  assert_that(
> 14:30:12  pcoll | beam.FlatMap(cross_product, 
> beam.pvalue.AsList(derived)),
> 14:30:12  >   equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 
> 'b')]))
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/fn_api_runner_test.py:258: 
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  apache_beam/pipeline.py:481: in __exit__
> 14:30:12  self.run().wait_until_finish()
> 14:30:12  apache_beam/runners/portability/portable_runner.py:445: in 
> wait_until_finish
> 14:30:12  for state_response in self._state_stream:
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:416:
>  in __next__
> 14:30:12  return self._next()
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_channel.py:694:
>  in _next
> 14:30:12  _common.wait(self._state.condition.wait, _response_ready)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:140:
>  in wait
> 14:30:12  _wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)
> 14:30:12  
> target/.tox-py36-gcp-pytest/py36-gcp-pytest/lib/python3.6/site-packages/grpc/_common.py:105:
>  in _wait_once
> 14:30:12  wait_fn(timeout=timeout)
> 14:30:12  /usr/lib/python3.6/threading.py:299: in wait
> 14:30:12  gotit = waiter.acquire(True, timeout)
> 14:30:12  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
> _ _ _ _ _ _ 
> 14:30:12  
> 14:30:12  signum = 14, frame = 
> 14:30:12  
> 14:30:12  def handler(signum, frame):
> 14:30:12msg = 'Timed out after %s seconds.' % self.TIMEOUT_SECS
> 14:30:12print('=' * 20, msg, '=' * 20)
> 14:30:12traceback.print_stack(frame)
> 14:30:12threads_by_id = {th.ident: th for th in threading.enumerate()}
> 14:30:12for thread_id, stack in sys._current_frames().items():
> 14:30:12  th = threads_by_id.get(thread_id)
> 14:30:12  print()
> 14:30:12  print('# Thread:', th or thread_id)
> 14:30:12  traceback.print_stack(stack)
> 14:30:12  > raise BaseException(msg)
> 14:30:12  E BaseException: Timed out after 60 seconds.
> 14:30:12  
> 14:30:12  apache_beam/runners/portability/portable_runner_test.py:77: 
> BaseException
> {noformat}
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/1366/
> {noformat}
> 09:06:01  self = 
>   testMethod=test_assert_that>
> 09:06:01  
> 09:06:01  def test_assert_that(self):
> 09:06:01# TODO: figure out a way for fn_api_runner to parse and raise 
> the
> 09:06:01# underlying exception.
> 09:06:01with self.assertRaisesRegex(Exception, 'Failed assert'):
> 09:06:01  with self.create_pipeline() as p:
> 09:06:01  > assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
> 09:06:01  E AssertionError: "Failed assert" does not match "Pipeline 
> timed out waiting for job service subprocess."
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)