Re: [DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-21 Thread jincheng sun
Hi Luke,

Thanks a lot for your reply. Since it allows to share one SDK harness
between multiple executable stages, the control service termination may
occur much later than the completion of an executable stage. This is the
main reason I prefer runners to control the teardown of DoFns.

Regarding to "SDK harnesses can terminate instances any time they want and
start new instances anytime as well.", personally I think it's not conflict
with the proposed Approach 1 as the SDK harness could decide what to do
when receiving the teardown request. It could do nothing if the DoFns has
already been teared down and could also tear down the DoFns if needed.

What do you think?

Best,
Jincheng

Luke Cwik  于2019年10月22日周二 上午2:05写道:

> Approach 2 is currently the suggested approach[1] for DoFn's to shutdown.
> Note that SDK harnesses can terminate instances any time they want and
> start new instances anytime as well.
>
> Why do you want to expose this logic so that Runners could control it?
>
> 1:
> https://docs.google.com/document/d/1n6s3BOxOPct3uF4UgbbI9O9rpdiKWFH9R6mtVmR7xp0/edit#
>
> On Mon, Oct 21, 2019 at 4:27 AM jincheng sun 
> wrote:
>
>> Hi,
>> I found that in `SdkHarness` do not  stop the `SdkWorker` when finish.
>> We should add the logic for stop the `SdkWorker` in `SdkHarness`.  More
>> detail can be found [1].
>>
>> There are two approaches to solve this issue:
>>
>> Approach 1:  We can add a Fn API for teardown purpose and the runner will
>> teardown a specific bundle descriptor via this teardown Fn API during
>> disposing.
>> Approach 2: The control service termination could be seen as a signal and
>> once SDK harness receives this signal, the teardown of the bundle
>> descriptor will be performed.
>>
>> More detail can be found in [2].
>>
>> As the Approach 2, SDK harness could be shared between multiple
>> executable stages. The control service termination only occurs when all the
>> executable stages sharing the same SDK harness finished. This means that
>> the teardown of DoFns may not be executed immediately after an executable
>> stage is finished.
>>
>> So, I prefer Approach 1. Welcome any feedback :)
>>
>> Best,
>> Jincheng
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py
>> [2]
>> https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing
>>
>


Re: Issue BEAM-8452

2019-10-21 Thread Pablo Estrada
Hi Noah!
Thanks for contributing this. The change is relatively small, so I think
it's best to discuss it in a pull request, so we can look at the diff.
My preliminary comment would be that the first part makes sense:

if isinstance(schema, (str, unicode)):

 schema = bigquery_tools.parse_table_schema_from_json(schema)


And the second part seems unnecessary, but perhaps I'm missing something.
Why are you reparsing the schema?

 elif isinstance(schema, dict):

 schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))


Thanks! Please submit a pull request, and tag me (@pabloem) to review.
Best
-P.
On Mon, Oct 21, 2019 at 3:38 PM Noah Goodrich  wrote:

> I have created this issue (https://issues.apache.org/jira/browse/BEAM-8452
> ).
>
> The contributors guide suggests that if an issue is your first, it should
> be discussed on this mailing list. I would like to hear thoughts,
> questions, concerns etc on this proposed fix:
>
> def process(self, element, load_job_name_prefix, *schema_side_inputs):
>
> # Each load job is assumed to have files respecting these constraints:
>
>  # 1. Total size of all files < 15 TB (Max size for load jobs)
>
>  # 2. Total no. of files in a single load job < 10,000
>
>  # This assumption means that there will always be a single load job
>
>  # triggered for each partition of files.
>
>  destination = element[0]
>
>  files = element[1]
>
> if callable(self.schema):
>
>  schema = self.schema(destination, *schema_side_inputs)
>
>  elif isinstance(self.schema, vp.ValueProvider):
>
>  schema = self.schema.get()
>
>  else:
>
>  schema = self.schema
>
> if isinstance(schema, (str, unicode)):
>
>  schema = bigquery_tools.parse_table_schema_from_json(schema)
>
>  elif isinstance(schema, dict):
>
>  schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))
>
> 
>
>


Re: Proposal: Dynamic timer support (BEAM-6857)

2019-10-21 Thread Kenneth Knowles
This seems extremely useful.

I assume you mean `@OnTimer("timers")` in your example. I would suggest
that the parameter annotation be something other than @TimerId since that
annotation is already used for a very similar but different purpose; they
are close enough that it is tempting to pun them, but it is clearer to keep
them distinct IMO. Perhaps @TimerName or @TimerKey or some such.
Alternatively, keep @TimerId in the parameter list and change the
declaration to @TimerFamily("timers"). I think "family" or "group" may be
more clear naming than "map".

At the portability level, this API does seem to be pretty close to a noop
in terms of the messages that needs to be sent over the Fn API, so it makes
sense to loosen the protos. By the time the Fn API is in play, all of our
desires to catch errors prior to execution are irrelevant anyhow.

On the other hand, I think DSLs have a different & bigger problem than
this, in that they want to programmatically adjust all the capabilities of
a DoFn. Same goes for wrapping one DoFn in another. Certainly some limited
DSL use cases are addressed by this, but I wouldn't take that as a primary
use case for this feature. Ultimately they are probably better served by
being able to explicitly author a DoFnInvoker and provide it to a variant
of beam:transforms:ParDo where the do_fn field is a serialized DoFnInvoker.
Now that I think about this, I cannot recall why we don't already ship a
DoFnSignature & DoFnInvoker as the payload. That would allow maximum
flexibility in utilizing the portability framework.

Kenn

On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax  wrote:

> BEAM-6857 documents the need for dynamic timer support in the Beam API. I
> wanted to make a proposal for what this API would look like, and how to
> express it in the portability protos.
>
> Background: Today Beam (especially BeamJava) requires a ParDo to
> statically declare all timers it accesses at compile time. For example:
>
> class MyDoFn extends DoFn {
>   @TimerId("timer1") TimerSpec timer1 =
> TimerSpecs.timer(TimeDomain(EVENT_TIME));
>   @TimerId("timer2") TimerSpec timer2 =
> TimerSpecs.timer(TimeDomain(PROCESSING_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timer1") Timer
> timer1, @TimerId("timer2") Timer timer2)) {
> timer1.set(...);
> timer2.set(...);
>   }
>
>   @OnTimer("timer1") public void onTimer1() { ... }
>   @OnTimer("timer2") public void onTimer2() { ... }
> }
>
> This requires the author of a ParDo to know the full list of timers ahead
> of time, which has been problematic in many cases. One example where it
> causes issues is for DSLs such as Euphoria or Scio. DSL authors usually
> write ParDos to interpret the code written in the high-level DSL, and so
> don't know ahead of time the list of timers needed; alternatives today are
> quite ugly: physical code generation or creating a single timer that
> multiplexes all of the users logical timers. There are also cases where a
> ParDo needs multiple distinct timers, but the set of distinct timers is
> controlled by the input data, and therefore not knowable in advance. The
> Beam timer API has been insufficient for these use cases.
>
> I propose a new TimerMap construct, which allow a ParDo to dynamically set
> named timers. It's use in the Java API would look as follows:
>
> class MyDoFn extends DoFn {
>   @TimerId("timers") TimerSpec timers =
> TimerSpecs.timerMap(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerId("timers") TimerMap
> timer)) {
> timers.set("timer1", ...);
> timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs) { ... }
> }
>
> There is a new TimerSpec type to specify a TimerMap. The TimerMap class
> itself allows dynamically setting multiple timers based on a String tag
> argument. Each TimerMap has a single callback which when called is given
> the id of the timer that is currently firing.
>
> It is allowed to have multiple TimerMap objects in a ParDo (and required
> if you want to have both processing-time and event-time timers in the same
> ParDo). Each TimerMap is its own logical namespace. i.e. if the user sets
> timers with the same string tag on different TimerMap objects the timers
> will not collide.
>
> Currently the portability protos were written to mirror the Java API,
> expecting one TimerSpec per timer accessed by the ParDo. I suggest that we
> instead make TimerMap the default for portability, and model the current
> behavior on top of timer map. If this proves problematic for some runners,
> we could instead introduce a new TimerSpec proto to represent TimerMap.
>
> Thoughts?
>
> Reuven
>


Question related to running unit tests in IDE

2019-10-21 Thread Saikat Maitra
Hi,

I am interested to contribute to this issue

https://issues.apache.org/jira/browse/BEAM-3658

I have followed the contribution guide and was able to build the project
locally using gradlew commands.

I wanted to debug and trace the issue further by running the tests locally
using Intellij Idea but I am getting following errors. I looked up the docs
related to running tests (
https://cwiki.apache.org/confluence/display/BEAM/Run+a+single+unit+test)
and common IDE errors (
https://cwiki.apache.org/confluence/display/BEAM/%28FAQ%29+Recovering+from+common+IDE+errors)
but have not found similar errors.

Error:(632, 17) java: cannot find symbol
  symbol:   method
apply(org.apache.beam.sdk.transforms.Values)
  location: interface org.apache.beam.sdk.values.POutput

Error:(169, 26) java: cannot find symbol
  symbol:   class PCollection
  location: class org.apache.beam.sdk.transforms.Watch

Error:(169, 59) java: cannot find symbol
  symbol:   class KV
  location: class org.apache.beam.sdk.transforms.Watch

Please let me know if you have feedback.

Regards,
Saikat


Re: Interactive Beam Example Failing [BEAM-8451]

2019-10-21 Thread Robert Bradshaw
Thanks for trying this out. Yes, this is definitely something that
should be supported (and tested).

On Mon, Oct 21, 2019 at 3:40 PM Igor Durovic  wrote:
>
> Hi everyone,
>
> The interactive beam example using the DirectRunner fails after execution of 
> the last cell. The recursion limit is exceeded during the calculation of the 
> cache label because of a circular reference in the PipelineInfo object.
>
> The constructor for the PipelineInfo class creates a mapping from each 
> pcollection to the transforms that produce and consume it. The issue arises 
> when there exists a transform that is both a producer and a consumer for the 
> same pcollection. This occurs when a transform's expand method returns the 
> same pcoll object that's passed into it. The specific transform causing the 
> failure of the example is MaybeReshuffle, which is used in the Create 
> transform. Replacing "return pcoll" with "return pcoll | Map(lambda x: x)" 
> seems to fix the problem.
>
> A workaround for this issue on the interactive beam side would be fairly 
> simple, but it seems to me that there should be more validation of pipelines 
> to prevent the use of transforms that return the same pcoll that's passed in, 
> or at least a mention of this in the transform style guide. My understanding 
> is that pcollections are produced by a single transform (they even have a 
> field called "producer" that references only one transform). If that's the 
> case then that property of pcollections should be enforced.
>
> I made ticket BEAM-8451 to track this issue.
>
> I'm still new to beam so I apologize if I'm fundamentally misunderstanding 
> something. I'm not exactly sure what the next step should be and would 
> appreciate some recommendations. I can submit a PR to solve the immediate 
> problem of the failing example but the underlying problem should also be 
> addressed at some point. I also apologize if people are already aware of this 
> problem.
>
> Thank You!
> Igor Durovic


Interactive Beam Example Failing [BEAM-8451]

2019-10-21 Thread Igor Durovic
Hi everyone,

The interactive beam example using the DirectRunner fails after execution
of the last cell. The recursion limit is exceeded during the calculation of
the cache label because of a circular reference in the PipelineInfo object.

The constructor

for
the PipelineInfo class creates a mapping from each pcollection to the
transforms that produce and consume it. The issue arises when there exists
a transform that is both a producer and a consumer for the same
pcollection. This occurs when a transform's expand method returns the same
pcoll object that's passed into it. The specific transform causing the
failure of the example is MaybeReshuffle
,
which
is used in the Create transform. Replacing "return pcoll" with "return
pcoll | Map(lambda x: x)" seems to fix the problem.

A workaround for this issue on the interactive beam side would be fairly
simple, but it seems to me that there should be more validation of
pipelines to prevent the use of transforms that return the same pcoll
that's passed in, or at least a mention of this in the transform style
guide. My understanding is that pcollections are produced by a single
transform (they even have a field called "producer" that references only
one transform). If that's the case then that property of pcollections
should be enforced.

I made ticket BEAM-8451 to track this issue.

I'm still new to beam so I apologize if I'm fundamentally misunderstanding
something. I'm not exactly sure what the next step should be and would
appreciate some recommendations. I can submit a PR to solve the immediate
problem of the failing example but the underlying problem should also be
addressed at some point. I also apologize if people are already aware of
this problem.

Thank You!
Igor Durovic


Issue BEAM-8452

2019-10-21 Thread Noah Goodrich
I have created this issue (https://issues.apache.org/jira/browse/BEAM-8452).

The contributors guide suggests that if an issue is your first, it should
be discussed on this mailing list. I would like to hear thoughts,
questions, concerns etc on this proposed fix:

def process(self, element, load_job_name_prefix, *schema_side_inputs):

# Each load job is assumed to have files respecting these constraints:

 # 1. Total size of all files < 15 TB (Max size for load jobs)

 # 2. Total no. of files in a single load job < 10,000

 # This assumption means that there will always be a single load job

 # triggered for each partition of files.

 destination = element[0]

 files = element[1]

if callable(self.schema):

 schema = self.schema(destination, *schema_side_inputs)

 elif isinstance(self.schema, vp.ValueProvider):

 schema = self.schema.get()

 else:

 schema = self.schema

if isinstance(schema, (str, unicode)):

 schema = bigquery_tools.parse_table_schema_from_json(schema)

 elif isinstance(schema, dict):

 schema = bigquery_tools.parse_table_schema_from_json(json.dumps(schema))




CWiki edit rights.

2019-10-21 Thread Mikhail Gryzykhin
Hello everybody,

Just a friendly heads up.

Seems that CWiki changed authentication approach and people with non-apache
logins might have lost rights to edit Beam pages. So if you don't see
"Edit" button, that might be the case.

For committers: use your apache ldap. For others, I guess the process is
the same: ask for access on mailing list if needed.

Regards,
Mikhail.


Proposal: Dynamic timer support (BEAM-6857)

2019-10-21 Thread Reuven Lax
BEAM-6857 documents the need for dynamic timer support in the Beam API. I
wanted to make a proposal for what this API would look like, and how to
express it in the portability protos.

Background: Today Beam (especially BeamJava) requires a ParDo to statically
declare all timers it accesses at compile time. For example:

class MyDoFn extends DoFn {
  @TimerId("timer1") TimerSpec timer1 =
TimerSpecs.timer(TimeDomain(EVENT_TIME));
  @TimerId("timer2") TimerSpec timer2 =
TimerSpecs.timer(TimeDomain(PROCESSING_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timer1") Timer
timer1, @TimerId("timer2") Timer timer2)) {
timer1.set(...);
timer2.set(...);
  }

  @OnTimer("timer1") public void onTimer1() { ... }
  @OnTimer("timer2") public void onTimer2() { ... }
}

This requires the author of a ParDo to know the full list of timers ahead
of time, which has been problematic in many cases. One example where it
causes issues is for DSLs such as Euphoria or Scio. DSL authors usually
write ParDos to interpret the code written in the high-level DSL, and so
don't know ahead of time the list of timers needed; alternatives today are
quite ugly: physical code generation or creating a single timer that
multiplexes all of the users logical timers. There are also cases where a
ParDo needs multiple distinct timers, but the set of distinct timers is
controlled by the input data, and therefore not knowable in advance. The
Beam timer API has been insufficient for these use cases.

I propose a new TimerMap construct, which allow a ParDo to dynamically set
named timers. It's use in the Java API would look as follows:

class MyDoFn extends DoFn {
  @TimerId("timers") TimerSpec timers =
TimerSpecs.timerMap(TimeDomain(EVENT_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timers") TimerMap
timer)) {
timers.set("timer1", ...);
timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String timerFired,
@Timestamp Instant timerTs) { ... }
}

There is a new TimerSpec type to specify a TimerMap. The TimerMap class
itself allows dynamically setting multiple timers based on a String tag
argument. Each TimerMap has a single callback which when called is given
the id of the timer that is currently firing.

It is allowed to have multiple TimerMap objects in a ParDo (and required if
you want to have both processing-time and event-time timers in the same
ParDo). Each TimerMap is its own logical namespace. i.e. if the user sets
timers with the same string tag on different TimerMap objects the timers
will not collide.

Currently the portability protos were written to mirror the Java API,
expecting one TimerSpec per timer accessed by the ParDo. I suggest that we
instead make TimerMap the default for portability, and model the current
behavior on top of timer map. If this proves problematic for some runners,
we could instead introduce a new TimerSpec proto to represent TimerMap.

Thoughts?

Reuven


Re: Test failures in python precommit: ZipFileArtifactServiceTest

2019-10-21 Thread Chad Dombrova
thanks again!

On Mon, Oct 21, 2019 at 1:03 PM Robert Bradshaw  wrote:

> I just merged https://github.com/apache/beam/pull/9845 which should
> resolve the issue.
>
> On Mon, Oct 21, 2019 at 12:58 PM Chad Dombrova  wrote:
> >
> > thanks!
> >
> > On Mon, Oct 21, 2019 at 12:47 PM Kyle Weaver 
> wrote:
> >>
> >> This issue is being tracked at
> https://issues.apache.org/jira/browse/BEAM-8416.
> >>
> >> On Mon, Oct 21, 2019 at 9:42 PM Chad Dombrova 
> wrote:
> >>>
> >>> Hi all,
> >>> Is anyone else getting these errors in
> apache_beam.runners.portability.artifact_service_test.ZipFileArtifactServiceTest?
> >>>
> >>> They seem to be taking two forms:
> >>>
> >>> zipfile.BadZipFile: Bad CRC-32 for file
> '/3e3ff9aa4fe679c1bf76383e69bfb5e2167afb945aa30e15f05406cc8f55ad14/9367417d63903350aeb7e092bca792263d4fd82d4912252e014e073a8931b4c1'
> >>>
> >>> zipfile.BadZipFile: Bad magic number for file header
> >>>
> >>> Here are some gradle scans:
> >>>
> >>>
> https://scans.gradle.com/s/b7jd7oyu5f5f6/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Cython#L14473
> >>>
> >>>
> https://scans.gradle.com/s/4iega3kyf5kw2/console-log?task=:sdks:python:test-suites:tox:py37:testPython37#L13749
> >>>
> >>> I got it to go through eventually after 4 tries.
> >>>
> >>> -chad
>


Re: Test failures in python precommit: ZipFileArtifactServiceTest

2019-10-21 Thread Robert Bradshaw
I just merged https://github.com/apache/beam/pull/9845 which should
resolve the issue.

On Mon, Oct 21, 2019 at 12:58 PM Chad Dombrova  wrote:
>
> thanks!
>
> On Mon, Oct 21, 2019 at 12:47 PM Kyle Weaver  wrote:
>>
>> This issue is being tracked at 
>> https://issues.apache.org/jira/browse/BEAM-8416.
>>
>> On Mon, Oct 21, 2019 at 9:42 PM Chad Dombrova  wrote:
>>>
>>> Hi all,
>>> Is anyone else getting these errors in 
>>> apache_beam.runners.portability.artifact_service_test.ZipFileArtifactServiceTest?
>>>
>>> They seem to be taking two forms:
>>>
>>> zipfile.BadZipFile: Bad CRC-32 for file 
>>> '/3e3ff9aa4fe679c1bf76383e69bfb5e2167afb945aa30e15f05406cc8f55ad14/9367417d63903350aeb7e092bca792263d4fd82d4912252e014e073a8931b4c1'
>>>
>>> zipfile.BadZipFile: Bad magic number for file header
>>>
>>> Here are some gradle scans:
>>>
>>> https://scans.gradle.com/s/b7jd7oyu5f5f6/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Cython#L14473
>>>
>>> https://scans.gradle.com/s/4iega3kyf5kw2/console-log?task=:sdks:python:test-suites:tox:py37:testPython37#L13749
>>>
>>> I got it to go through eventually after 4 tries.
>>>
>>> -chad


Re: Test failures in python precommit: ZipFileArtifactServiceTest

2019-10-21 Thread Chad Dombrova
thanks!

On Mon, Oct 21, 2019 at 12:47 PM Kyle Weaver  wrote:

> This issue is being tracked at
> https://issues.apache.org/jira/browse/BEAM-8416.
>
> On Mon, Oct 21, 2019 at 9:42 PM Chad Dombrova  wrote:
>
>> Hi all,
>> Is anyone else getting these errors in
>> apache_beam.runners.portability.artifact_service_test.ZipFileArtifactServiceTest?
>>
>> They seem to be taking two forms:
>>
>> zipfile.BadZipFile: Bad CRC-32 for file 
>> '/3e3ff9aa4fe679c1bf76383e69bfb5e2167afb945aa30e15f05406cc8f55ad14/9367417d63903350aeb7e092bca792263d4fd82d4912252e014e073a8931b4c1'
>>
>> zipfile.BadZipFile: Bad magic number for file header
>>
>> Here are some gradle scans:
>>
>>
>> https://scans.gradle.com/s/b7jd7oyu5f5f6/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Cython#L14473
>>
>>
>> https://scans.gradle.com/s/4iega3kyf5kw2/console-log?task=:sdks:python:test-suites:tox:py37:testPython37#L13749
>>
>> I got it to go through eventually after 4 tries.
>>
>> -chad
>>
>


Re: Test failures in python precommit: ZipFileArtifactServiceTest

2019-10-21 Thread Kyle Weaver
This issue is being tracked at
https://issues.apache.org/jira/browse/BEAM-8416.

On Mon, Oct 21, 2019 at 9:42 PM Chad Dombrova  wrote:

> Hi all,
> Is anyone else getting these errors in
> apache_beam.runners.portability.artifact_service_test.ZipFileArtifactServiceTest?
>
> They seem to be taking two forms:
>
> zipfile.BadZipFile: Bad CRC-32 for file 
> '/3e3ff9aa4fe679c1bf76383e69bfb5e2167afb945aa30e15f05406cc8f55ad14/9367417d63903350aeb7e092bca792263d4fd82d4912252e014e073a8931b4c1'
>
> zipfile.BadZipFile: Bad magic number for file header
>
> Here are some gradle scans:
>
>
> https://scans.gradle.com/s/b7jd7oyu5f5f6/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Cython#L14473
>
>
> https://scans.gradle.com/s/4iega3kyf5kw2/console-log?task=:sdks:python:test-suites:tox:py37:testPython37#L13749
>
> I got it to go through eventually after 4 tries.
>
> -chad
>


Test failures in python precommit: ZipFileArtifactServiceTest

2019-10-21 Thread Chad Dombrova
Hi all,
Is anyone else getting these errors in
apache_beam.runners.portability.artifact_service_test.ZipFileArtifactServiceTest?

They seem to be taking two forms:

zipfile.BadZipFile: Bad CRC-32 for file
'/3e3ff9aa4fe679c1bf76383e69bfb5e2167afb945aa30e15f05406cc8f55ad14/9367417d63903350aeb7e092bca792263d4fd82d4912252e014e073a8931b4c1'

zipfile.BadZipFile: Bad magic number for file header

Here are some gradle scans:

https://scans.gradle.com/s/b7jd7oyu5f5f6/console-log?task=:sdks:python:test-suites:tox:py37:testPy37Cython#L14473

https://scans.gradle.com/s/4iega3kyf5kw2/console-log?task=:sdks:python:test-suites:tox:py37:testPython37#L13749

I got it to go through eventually after 4 tries.

-chad


Re: Are empty bundles allowed by model?

2019-10-21 Thread Robert Bradshaw
Yes, the test should be fixed.

On Mon, Oct 21, 2019 at 11:20 AM Jan Lukavský  wrote:
>
> Hi Robert,
>
> I though it would be that case. ParDoLifecycleTest, however, does not
> currently allow for empty bundles. We have currently worked around this
> in Flink by avoiding the creation of these bundles, but maybe the test
> should be modified so that it adheres to the model [1].
>
> Jan
>
> [1] https://github.com/apache/beam/pull/9846
>
> On 10/21/19 6:00 PM, Robert Bradshaw wrote:
> > Yes, the model allows them.
> >
> > It also takes less work to avoid them in general (e.g. imagine one
> > reshuffles N elements to M > N workers. A priori, one would "start" a
> > bundle and then try to read all data destined for that
> > worker--postponing this until one knows that the set of data for this
> > worker could be an optimization (as could not doing so as a form of
> > speculative execution) but should not be necessary.
> >
> > - Robert
> >
> > On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:
> >> Hi Max,
> >>
> >> that is true, but then we have two orthogonal issues:
> >>
> >>a) correctness - if empty bundles are aligned with the model, then
> >> validates runner tests should take that into account
> >>
> >>b) performance - that can be dealt with in separate JIRA issue, if 
> >> needed
> >>
> >> WDYT?
> >>
> >> Jan
> >>
> >> On 10/21/19 3:22 PM, Maximilian Michels wrote:
> >>> Hi Jan,
> >>>
> >>> I think it is aligned with the model to create empty bundles. The
> >>> question if course, whether it is preferable to avoid them, since the
> >>> Setup/Finish state might be costly, depending on the bundle size and
> >>> the type of DoFn used.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 21.10.19 14:13, Kyle Weaver wrote:
>  Nevermind, this is discussed on the PR linked.
> 
>  On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver   > wrote:
> 
>   Do you know why an empty bundle might be created?
> 
>   On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský    > wrote:
> 
>   Hi,
> 
>   when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
>   found a
>   situation, where Flink might create empty bundle - i.e. call
>   @StartBundle immediately followed by @FinishBundle, with no
>   elements
>   inside the bundle. That is what breaks the ParDoLifecycleTest,
>   because
>   the test explicitly assumes, that the sequence of lifecycle
>  methods
>   should be StartBundle -> Process Element -> Finish Bundle. It is
>   easy to
>   modify the test to accept situation of StartBundle ->
>   FinishBundle with
>   no elements ([1]), but the question is, is this allowed by the
>   model? I
>   think there is no reason not to be, but I'd like to be sure.
> 
>   Thanks,
> 
>  Jan
> 
>   [1] https://github.com/apache/beam/pull/9841
> 


Re: Are empty bundles allowed by model?

2019-10-21 Thread Luke Cwik
Yes, please update the test.

On Mon, Oct 21, 2019 at 11:20 AM Jan Lukavský  wrote:

> Hi Robert,
>
> I though it would be that case. ParDoLifecycleTest, however, does not
> currently allow for empty bundles. We have currently worked around this
> in Flink by avoiding the creation of these bundles, but maybe the test
> should be modified so that it adheres to the model [1].
>
> Jan
>
> [1] https://github.com/apache/beam/pull/9846
>
> On 10/21/19 6:00 PM, Robert Bradshaw wrote:
> > Yes, the model allows them.
> >
> > It also takes less work to avoid them in general (e.g. imagine one
> > reshuffles N elements to M > N workers. A priori, one would "start" a
> > bundle and then try to read all data destined for that
> > worker--postponing this until one knows that the set of data for this
> > worker could be an optimization (as could not doing so as a form of
> > speculative execution) but should not be necessary.
> >
> > - Robert
> >
> > On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:
> >> Hi Max,
> >>
> >> that is true, but then we have two orthogonal issues:
> >>
> >>a) correctness - if empty bundles are aligned with the model, then
> >> validates runner tests should take that into account
> >>
> >>b) performance - that can be dealt with in separate JIRA issue, if
> needed
> >>
> >> WDYT?
> >>
> >> Jan
> >>
> >> On 10/21/19 3:22 PM, Maximilian Michels wrote:
> >>> Hi Jan,
> >>>
> >>> I think it is aligned with the model to create empty bundles. The
> >>> question if course, whether it is preferable to avoid them, since the
> >>> Setup/Finish state might be costly, depending on the bundle size and
> >>> the type of DoFn used.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 21.10.19 14:13, Kyle Weaver wrote:
>  Nevermind, this is discussed on the PR linked.
> 
>  On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver   > wrote:
> 
>   Do you know why an empty bundle might be created?
> 
>   On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský    > wrote:
> 
>   Hi,
> 
>   when debugging a flaky ParDoLifecycleTest in FlinkRunner, I
> have
>   found a
>   situation, where Flink might create empty bundle - i.e. call
>   @StartBundle immediately followed by @FinishBundle, with no
>   elements
>   inside the bundle. That is what breaks the
> ParDoLifecycleTest,
>   because
>   the test explicitly assumes, that the sequence of lifecycle
>  methods
>   should be StartBundle -> Process Element -> Finish Bundle.
> It is
>   easy to
>   modify the test to accept situation of StartBundle ->
>   FinishBundle with
>   no elements ([1]), but the question is, is this allowed by
> the
>   model? I
>   think there is no reason not to be, but I'd like to be sure.
> 
>   Thanks,
> 
>  Jan
> 
>   [1] https://github.com/apache/beam/pull/9841
> 
>


Re: Are empty bundles allowed by model?

2019-10-21 Thread Jan Lukavský

Hi Robert,

I though it would be that case. ParDoLifecycleTest, however, does not 
currently allow for empty bundles. We have currently worked around this 
in Flink by avoiding the creation of these bundles, but maybe the test 
should be modified so that it adheres to the model [1].


Jan

[1] https://github.com/apache/beam/pull/9846

On 10/21/19 6:00 PM, Robert Bradshaw wrote:

Yes, the model allows them.

It also takes less work to avoid them in general (e.g. imagine one
reshuffles N elements to M > N workers. A priori, one would "start" a
bundle and then try to read all data destined for that
worker--postponing this until one knows that the set of data for this
worker could be an optimization (as could not doing so as a form of
speculative execution) but should not be necessary.

- Robert

On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:

Hi Max,

that is true, but then we have two orthogonal issues:

   a) correctness - if empty bundles are aligned with the model, then
validates runner tests should take that into account

   b) performance - that can be dealt with in separate JIRA issue, if needed

WDYT?

Jan

On 10/21/19 3:22 PM, Maximilian Michels wrote:

Hi Jan,

I think it is aligned with the model to create empty bundles. The
question if course, whether it is preferable to avoid them, since the
Setup/Finish state might be costly, depending on the bundle size and
the type of DoFn used.

Cheers,
Max

On 21.10.19 14:13, Kyle Weaver wrote:

Nevermind, this is discussed on the PR linked.

On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver mailto:kcwea...@google.com>> wrote:

 Do you know why an empty bundle might be created?

 On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

 Hi,

 when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
 found a
 situation, where Flink might create empty bundle - i.e. call
 @StartBundle immediately followed by @FinishBundle, with no
 elements
 inside the bundle. That is what breaks the ParDoLifecycleTest,
 because
 the test explicitly assumes, that the sequence of lifecycle
methods
 should be StartBundle -> Process Element -> Finish Bundle. It is
 easy to
 modify the test to accept situation of StartBundle ->
 FinishBundle with
 no elements ([1]), but the question is, is this allowed by the
 model? I
 think there is no reason not to be, but I'd like to be sure.

 Thanks,

Jan

 [1] https://github.com/apache/beam/pull/9841



Re: [DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-21 Thread Luke Cwik
Approach 2 is currently the suggested approach[1] for DoFn's to shutdown.
Note that SDK harnesses can terminate instances any time they want and
start new instances anytime as well.

Why do you want to expose this logic so that Runners could control it?

1:
https://docs.google.com/document/d/1n6s3BOxOPct3uF4UgbbI9O9rpdiKWFH9R6mtVmR7xp0/edit#

On Mon, Oct 21, 2019 at 4:27 AM jincheng sun 
wrote:

> Hi,
> I found that in `SdkHarness` do not  stop the `SdkWorker` when finish.  We
> should add the logic for stop the `SdkWorker` in `SdkHarness`.  More detail
> can be found [1].
>
> There are two approaches to solve this issue:
>
> Approach 1:  We can add a Fn API for teardown purpose and the runner will
> teardown a specific bundle descriptor via this teardown Fn API during
> disposing.
> Approach 2: The control service termination could be seen as a signal and
> once SDK harness receives this signal, the teardown of the bundle
> descriptor will be performed.
>
> More detail can be found in [2].
>
> As the Approach 2, SDK harness could be shared between multiple executable
> stages. The control service termination only occurs when all the executable
> stages sharing the same SDK harness finished. This means that the teardown
> of DoFns may not be executed immediately after an executable stage is
> finished.
>
> So, I prefer Approach 1. Welcome any feedback :)
>
> Best,
> Jincheng
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py
> [2]
> https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing
>


Re: Are empty bundles allowed by model?

2019-10-21 Thread Robert Bradshaw
Yes, the model allows them.

It also takes less work to avoid them in general (e.g. imagine one
reshuffles N elements to M > N workers. A priori, one would "start" a
bundle and then try to read all data destined for that
worker--postponing this until one knows that the set of data for this
worker could be an optimization (as could not doing so as a form of
speculative execution) but should not be necessary.

- Robert

On Mon, Oct 21, 2019 at 7:03 AM Jan Lukavský  wrote:
>
> Hi Max,
>
> that is true, but then we have two orthogonal issues:
>
>   a) correctness - if empty bundles are aligned with the model, then
> validates runner tests should take that into account
>
>   b) performance - that can be dealt with in separate JIRA issue, if needed
>
> WDYT?
>
> Jan
>
> On 10/21/19 3:22 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> > I think it is aligned with the model to create empty bundles. The
> > question if course, whether it is preferable to avoid them, since the
> > Setup/Finish state might be costly, depending on the bundle size and
> > the type of DoFn used.
> >
> > Cheers,
> > Max
> >
> > On 21.10.19 14:13, Kyle Weaver wrote:
> >> Nevermind, this is discussed on the PR linked.
> >>
> >> On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver  >> > wrote:
> >>
> >> Do you know why an empty bundle might be created?
> >>
> >> On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský  >> > wrote:
> >>
> >> Hi,
> >>
> >> when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
> >> found a
> >> situation, where Flink might create empty bundle - i.e. call
> >> @StartBundle immediately followed by @FinishBundle, with no
> >> elements
> >> inside the bundle. That is what breaks the ParDoLifecycleTest,
> >> because
> >> the test explicitly assumes, that the sequence of lifecycle
> >> methods
> >> should be StartBundle -> Process Element -> Finish Bundle. It is
> >> easy to
> >> modify the test to accept situation of StartBundle ->
> >> FinishBundle with
> >> no elements ([1]), but the question is, is this allowed by the
> >> model? I
> >> think there is no reason not to be, but I'd like to be sure.
> >>
> >> Thanks,
> >>
> >>Jan
> >>
> >> [1] https://github.com/apache/beam/pull/9841
> >>


Re: Are empty bundles allowed by model?

2019-10-21 Thread Jan Lukavský

Hi Max,

that is true, but then we have two orthogonal issues:

 a) correctness - if empty bundles are aligned with the model, then 
validates runner tests should take that into account


 b) performance - that can be dealt with in separate JIRA issue, if needed

WDYT?

Jan

On 10/21/19 3:22 PM, Maximilian Michels wrote:

Hi Jan,

I think it is aligned with the model to create empty bundles. The 
question if course, whether it is preferable to avoid them, since the 
Setup/Finish state might be costly, depending on the bundle size and 
the type of DoFn used.


Cheers,
Max

On 21.10.19 14:13, Kyle Weaver wrote:

Nevermind, this is discussed on the PR linked.

On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver > wrote:


    Do you know why an empty bundle might be created?

    On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:

    Hi,

    when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have
    found a
    situation, where Flink might create empty bundle - i.e. call
    @StartBundle immediately followed by @FinishBundle, with no
    elements
    inside the bundle. That is what breaks the ParDoLifecycleTest,
    because
    the test explicitly assumes, that the sequence of lifecycle 
methods

    should be StartBundle -> Process Element -> Finish Bundle. It is
    easy to
    modify the test to accept situation of StartBundle ->
    FinishBundle with
    no elements ([1]), but the question is, is this allowed by the
    model? I
    think there is no reason not to be, but I'd like to be sure.

    Thanks,

   Jan

    [1] https://github.com/apache/beam/pull/9841



Re: Are empty bundles allowed by model?

2019-10-21 Thread Kyle Weaver
Do you know why an empty bundle might be created?

On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský  wrote:

> Hi,
>
> when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have found a
> situation, where Flink might create empty bundle - i.e. call
> @StartBundle immediately followed by @FinishBundle, with no elements
> inside the bundle. That is what breaks the ParDoLifecycleTest, because
> the test explicitly assumes, that the sequence of lifecycle methods
> should be StartBundle -> Process Element -> Finish Bundle. It is easy to
> modify the test to accept situation of StartBundle -> FinishBundle with
> no elements ([1]), but the question is, is this allowed by the model? I
> think there is no reason not to be, but I'd like to be sure.
>
> Thanks,
>
>   Jan
>
> [1] https://github.com/apache/beam/pull/9841
>
>


Re: Are empty bundles allowed by model?

2019-10-21 Thread Kyle Weaver
Nevermind, this is discussed on the PR linked.

On Mon, Oct 21, 2019 at 2:11 PM Kyle Weaver  wrote:

> Do you know why an empty bundle might be created?
>
> On Mon, Oct 21, 2019 at 1:42 PM Jan Lukavský  wrote:
>
>> Hi,
>>
>> when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have found a
>> situation, where Flink might create empty bundle - i.e. call
>> @StartBundle immediately followed by @FinishBundle, with no elements
>> inside the bundle. That is what breaks the ParDoLifecycleTest, because
>> the test explicitly assumes, that the sequence of lifecycle methods
>> should be StartBundle -> Process Element -> Finish Bundle. It is easy to
>> modify the test to accept situation of StartBundle -> FinishBundle with
>> no elements ([1]), but the question is, is this allowed by the model? I
>> think there is no reason not to be, but I'd like to be sure.
>>
>> Thanks,
>>
>>   Jan
>>
>> [1] https://github.com/apache/beam/pull/9841
>>
>>


Beam Dependency Check Report (2019-10-21)

2019-10-21 Thread Apache Jenkins Server

High Priority Dependency Updates Of Beam Python SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
mock
2.0.0
3.0.5
2019-05-20
2019-05-20BEAM-7369
oauth2client
3.0.0
4.1.3
2018-12-10
2018-12-10BEAM-6089
Sphinx
1.8.5
2.2.0
2019-05-20
2019-08-19BEAM-7370
High Priority Dependency Updates Of Beam Java SDK:


  Dependency Name
  Current Version
  Latest Version
  Release Date Of the Current Used Version
  Release Date Of The Latest Release
  JIRA Issue
  
com.github.ben-manes.versions:com.github.ben-manes.versions.gradle.plugin
0.20.0
0.27.0
2019-02-11
2019-10-21BEAM-6645
com.github.spotbugs:spotbugs
3.1.12
4.0.0-beta4
2019-03-01
2019-09-18BEAM-7792
com.github.spotbugs:spotbugs-annotations
3.1.12
4.0.0-beta4
2019-03-01
2019-09-18BEAM-6951
javax.servlet:javax.servlet-api
3.1.0
4.0.1
2013-04-25
2018-04-20BEAM-5750
org.conscrypt:conscrypt-openjdk
1.1.3
2.2.1
2018-06-04
2019-08-08BEAM-5748
org.eclipse.jetty:jetty-server
9.2.10.v20150310
10.0.0-alpha0
2015-03-10
2019-07-11BEAM-5752
org.eclipse.jetty:jetty-servlet
9.2.10.v20150310
10.0.0-alpha0
2015-03-10
2019-07-11BEAM-5753
Gradle:
5.2.1 -> 5.6.3
6.0-rc-1
None
2019-10-21BEAM-8002

 A dependency update is high priority if it satisfies one of following criteria: 

 It has major versions update available, e.g. org.assertj:assertj-core 2.5.0 -> 3.10.0; 


 It is over 3 minor versions behind the latest version, e.g. org.tukaani:xz 1.5 -> 1.8; 


 The current version is behind the later version for over 180 days, e.g. com.google.auto.service:auto-service 2014-10-24 -> 2017-12-11. 

 In Beam, we make a best-effort attempt at keeping all dependencies up-to-date.
 In the future, issues will be filed and tracked for these automatically,
 but in the meantime you can search for existing issues or open a new one.

 For more information:  Beam Dependency Guide  

Are empty bundles allowed by model?

2019-10-21 Thread Jan Lukavský

Hi,

when debugging a flaky ParDoLifecycleTest in FlinkRunner, I have found a 
situation, where Flink might create empty bundle - i.e. call 
@StartBundle immediately followed by @FinishBundle, with no elements 
inside the bundle. That is what breaks the ParDoLifecycleTest, because 
the test explicitly assumes, that the sequence of lifecycle methods 
should be StartBundle -> Process Element -> Finish Bundle. It is easy to 
modify the test to accept situation of StartBundle -> FinishBundle with 
no elements ([1]), but the question is, is this allowed by the model? I 
think there is no reason not to be, but I'd like to be sure.


Thanks,

 Jan

[1] https://github.com/apache/beam/pull/9841



[DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-21 Thread jincheng sun
Hi,
I found that in `SdkHarness` do not  stop the `SdkWorker` when finish.  We
should add the logic for stop the `SdkWorker` in `SdkHarness`.  More detail
can be found [1].

There are two approaches to solve this issue:

Approach 1:  We can add a Fn API for teardown purpose and the runner will
teardown a specific bundle descriptor via this teardown Fn API during
disposing.
Approach 2: The control service termination could be seen as a signal and
once SDK harness receives this signal, the teardown of the bundle
descriptor will be performed.

More detail can be found in [2].

As the Approach 2, SDK harness could be shared between multiple executable
stages. The control service termination only occurs when all the executable
stages sharing the same SDK harness finished. This means that the teardown
of DoFns may not be executed immediately after an executable stage is
finished.

So, I prefer Approach 1. Welcome any feedback :)

Best,
Jincheng

[1]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py
[2]
https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing