[jira] [Commented] (BEAM-3377) assert_that not working for streaming

2018-05-16 Thread JIRA

[ 
https://issues.apache.org/jira/browse/BEAM-3377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16478493#comment-16478493
 ] 

María GH commented on BEAM-3377:


I added streaming_wordcount_debugging.py, including:
 *  PrintFn (DoFn) to inspect element, window, and timestamp.
 *  AddTimestampFn (DoFn) to modify timestamps.
 *  assert_that via check_gbk_format and equal_to_per_window (matchers).
 *  Change parameter custom_windowing to use_global_window to reuse the 
workflow's windowing when possible.
 *  Update test_stream_test.py test to use assert_that for either global or 
per-window spaces appropriately.

It works in the DirectRunner, but it doesn't in the DataflowRunner on two 
grounds:
 1) Using a controlled emission of meesages: streaming_wordcount_it_test fails 
when using AddTimestampDoFn.
 2) Using a simple matcher to check the format (word:count): It reports:
 Caused by: java.lang.ClassCastException: 
org.apache.beam.sdk.transforms.windowing.IntervalWindow cannot be cast to 
org.apache.beam.sdk.transforms.windowing.GlobalWindow.
 I have created 
[assert-addts|https://github.com/mariapython/incubator-beam/tree/assert-addts] 
(private branch) and added two commits for 1 and 2, so that this work can be 
revisited in the future.
 1) Show AddTimestampDoFn fails Dataflow streaming wordcount
 2) Show a simple format checker fails Dataflow streaming wordcount

> assert_that not working for streaming
> -
>
> Key: BEAM-3377
> URL: https://issues.apache.org/jira/browse/BEAM-3377
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.2.0
>Reporter: María GH
>Priority: Major
>  Labels: starter
>  Time Spent: 4h
>  Remaining Estimate: 0h
>
> assert_that does not work for AfterWatermark timers.
> Easy way to reproduce: modify test_gbk_execution [1] in this form:
>  
> {code:java}
>  def test_this(self):
> test_stream = (TestStream()
>.add_elements(['a', 'b', 'c'])
>.advance_watermark_to(20))
> def fnc(x):
>   print 'fired_elem:', x
>   return x
> options = PipelineOptions()
> options.view_as(StandardOptions).streaming = True
> p = TestPipeline(options=options)
> records = (p
>| test_stream
>| beam.WindowInto(
>FixedWindows(15),
>
> trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)),
>accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
>| beam.Map(lambda x: ('k', x))
>| beam.GroupByKey())
> assert_that(records, equal_to([
> ('k', ['a', 'b', 'c'])]))
> p.run()
> {code}
> This test will pass, but if the .advance_watermark_to(20) is removed, the 
> test will fail. However, both cases fire the same elements:
>   fired_elem: ('k', ['a', 'b', 'c'])
>   fired_elem: ('k', ['a', 'b', 'c'])
> In the passing case, they correspond to the sorted_actual inside the 
> assert_that. In the failing case:
>   sorted_actual: [('k', ['a', 'b', 'c']), ('k', ['a', 'b', 'c'])]
>   sorted_actual: []
> [1] 
> https://github.com/mariapython/incubator-beam/blob/direct-timers-show/sdks/python/apache_beam/testing/test_stream_test.py#L120



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (BEAM-3377) assert_that not working for streaming

2018-05-11 Thread JIRA

[ 
https://issues.apache.org/jira/browse/BEAM-3377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472770#comment-16472770
 ] 

María GH commented on BEAM-3377:


assert_that has been added and TODO's that needed the assert_that have been 
removed.

However, a new issue has been found which needs further investigation–when 
trying to add unit tests, it was found that elements lost their timestamps.


{panel:title=Preliminary investigation}

For a generic unit test of this form:
{code:java}
def test_windowed_value_passes(self):
  expected = ...
  with TestPipeline() as p:
p = (p | 'side TestStream' >> TestStream()
   .add_elements([window.TimestampedValue('s1', 10)]))
assert_that(p, equal_to_per_window(expected), 
custom_windowing=window.FixedWindows(30), reify_windows=True){code}
With an annotated assert_that pipeline:
{code:java}
def expand(self, pcoll):
  if reify_windows:
pcoll = pcoll | ParDo(ReifyTimestampWindow())

  keyed_singleton = pcoll.pipeline | Create([(None, None)])
  keyed_actual = (
  pcoll
  # | WindowInto(custom_windowing or window.GlobalWindows())
  | WindowInto(custom_windowing)  # To make sure it takes a custome one
  | 'AfterWindow' >> ParDo(PrintDo('AfterWindow'))
  | "ToVoidKey" >> Map(lambda v: (None, v))
  | 'AfterVoidKey' >> ParDo(PrintDo('AfterVoidKey'))
  )
  plain_actual = ((keyed_singleton, keyed_actual)
  | "Group" >> CoGroupByKey()
  | 'AfterCoGBK' >> ParDo(PrintDo('AfterCoGBK'))
  | "Unkey" >> Map(lambda k_values: k_values[1][1]))

  if custom_windowing:
plain_actual = plain_actual | "AddWindow" >> ParDo(AddWindow())

  plain_actual = plain_actual | "Match" >> Map(matcher)
{code}
I see these results:
{noformat}
AfterWindow s1 [0.0, 30.0) Timestamp(10)
AfterVoidKey (None, 's1') [0.0, 30.0) Timestamp(10)
AfterCoGBK (None, ([None], ['s1'])) GlobalWindow 
Timestamp(-9223372036854.775808)
{noformat}
 The test was also attempted with a batch pipeline:
{code}
p = p | Create(['s1']) | Map(lambda e: window.TimestampedValue(e, 10)) ...

{code}
And the same result was observed.

Annotated (debugging) code for tests and assert_that in private branch 
[assert_exp|https://github.com/mariapython/incubator-beam/blob/assert-exp/sdks/python/apache_beam/testing/util.py].

{panel}

> assert_that not working for streaming
> -
>
> Key: BEAM-3377
> URL: https://issues.apache.org/jira/browse/BEAM-3377
> Project: Beam
>  Issue Type: Bug
>  Components: sdk-py-core
>Affects Versions: 2.2.0
>Reporter: María GH
>Priority: Major
>  Labels: starter
>  Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> assert_that does not work for AfterWatermark timers.
> Easy way to reproduce: modify test_gbk_execution [1] in this form:
>  
> {code:java}
>  def test_this(self):
> test_stream = (TestStream()
>.add_elements(['a', 'b', 'c'])
>.advance_watermark_to(20))
> def fnc(x):
>   print 'fired_elem:', x
>   return x
> options = PipelineOptions()
> options.view_as(StandardOptions).streaming = True
> p = TestPipeline(options=options)
> records = (p
>| test_stream
>| beam.WindowInto(
>FixedWindows(15),
>
> trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)),
>accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
>| beam.Map(lambda x: ('k', x))
>| beam.GroupByKey())
> assert_that(records, equal_to([
> ('k', ['a', 'b', 'c'])]))
> p.run()
> {code}
> This test will pass, but if the .advance_watermark_to(20) is removed, the 
> test will fail. However, both cases fire the same elements:
>   fired_elem: ('k', ['a', 'b', 'c'])
>   fired_elem: ('k', ['a', 'b', 'c'])
> In the passing case, they correspond to the sorted_actual inside the 
> assert_that. In the failing case:
>   sorted_actual: [('k', ['a', 'b', 'c']), ('k', ['a', 'b', 'c'])]
>   sorted_actual: []
> [1] 
> https://github.com/mariapython/incubator-beam/blob/direct-timers-show/sdks/python/apache_beam/testing/test_stream_test.py#L120



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)