Re: Beam python sdk with gevent

2018-09-19 Thread Rakesh Kumar
Gevent  is basically used to make parallel network
calls. We are using gevent in one of the transformation methods to call
internal services. The transformation method is making multiple network
call in parallel. Here is the code snippet:
/__init__.py
import gevent.monkey
gevent.monkey.patch_all()

/transform.py
from gevent import Greenlet
from gevent import joinall
def filter_out_invalid_users(events):
   key, user_id_data_pairs = events
   user_ids = [user_id for user_id, data in user_id_data_pairs]

   jobs = []
   id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
   for id_chunk in id_chunks:
  jobs.append(Greenlet.spawn(_call_users_service, # _call_user_service_
method is making the network call.
 list(id_chunk)))

   """
   Here we increase the timeout based on the number of greenlets we are
running, to account for yielding
   among greenlets
   """
   join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
   joinall(jobs, timeout=join_timeout)

   successful_jobs = [job for job in jobs if job.successful()]
   valid_user_ids = []
   for job in successful_jobs:
  network_response = job.get()
  valid_user_ids.append(network_response.user_id)
   yield valid_user_ids

def _call_users_service(user_ids):
   # make network call and return response
   ..
   ..
   return network_response

On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay  wrote:

> I am also not familiar with gevent. Could you explain what are you trying
> to do and how do you plan to use gevent?
>
> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik  wrote:
>
>> I don't think anyone has tried what your doing. The code that your
>> working with is very new.
>>
>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde  wrote:
>>
>>> Hi all,
>>>
>>> We're using the Python SDK with the portable Flink runner and running
>>> into some problems integrating gevent. We're patching the gRPC runtime for
>>> gevent as described in [0] which allows pipelines to start and partially
>>> run. However the tasks produce a stream of gevent exceptions:
>>>
>>> Exception greenlet.error: error('cannot switch to a different thread',)
>>> in 'grpc._cython.cygrpc.run_loop' ignored
>>> Traceback (most recent call last):
>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>   File "src/gevent/event.py", line 140, in
>>> gevent._event._AbstractLinkable._wait
>>>   File "src/gevent/event.py", line 117, in
>>> gevent._event._AbstractLinkable._wait_core
>>>   File "src/gevent/event.py", line 119, in
>>> gevent._event._AbstractLinkable._wait_core
>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>> gevent.__greenlet_primitives._greenlet_switch
>>> greenlet.error: cannot switch to a different thread
>>>
>>> and do not make any progress.
>>>
>>> Has anybody else successfully used the portable python sdk with gevent?
>>> Or is there a recommended alternative for doing async IO in python
>>> pipelines?
>>>
>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>
>>> Micah
>>>
>>
> --
Rakesh Kumar
Software Engineer
510-761-1364 |




Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-19 Thread flyisland
Hi Lukasz,

This socket server is like an MQTT server, it has queues inside it and the
client should ack each message.

> Is receiving and processing each message reliably important or is it ok
to drop messages when things fail?
A: Reliable is important, no messages should be lost.

> Is there a message acknowledgement system or can you request a position
within the message stream (e.g. send all messages from position X when
connecting and if for whatever reason you need to reconnect you can say
send messages from position X to replay past messages)?
A: Client should ack each message it received, and the server will delete
the acked message. If the client broked and the server do not receive an
ack, the server will re-send the message to the client while it online
again. And there is no "position" concept like kafka.


On Thu, Sep 20, 2018 at 4:27 AM Lukasz Cwik  wrote:

> Before getting into what you could use and the current state of
> SplittableDoFn and its supported features, I was wondering what reliability
> guarantees does the socket server have around messages?
>
> Is receiving and processing each message reliably important or is it ok to
> drop messages when things fail?
> Is there a message acknowledgement system or can you request a position
> within the message stream (e.g. send all messages from position X when
> connecting and if for whatever reason you need to reconnect you can say
> send messages from position X to replay past messages)?
>
>
>
>
> On Tue, Sep 18, 2018 at 5:00 PM flyisland  wrote:
>
>>
>> Hi Gurus,
>>
>> I'm trying to create an IO connector to fetch data from a socket server
>> from Beam, I'm new to Beam, but according to this blog <
>> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
>> that SDF is the recommended way to implement an IO connector now.
>>
>> This in-house built socket server could accept multiple clients, but only
>> send messages to the first-connected client, and will send messages to the
>> second client if the first one disconnected.
>>
>> To understand the lifecycle of a DoFn, I've just created a very simple
>> DoFn subclass, call log.debug() in every method, and according to the
>> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
>> in-memory resources, such as network connections. The resources can then be
>> disposed in DoFn.Teardown." I guess I should create the connection to the
>> socket server in the setup() method.
>>
>> But based on the log messages below, even the input PCollection has only
>> one element, Beam will still create more multiple DemoIO instances and
>> invoked a different DemoIO instance after every checkpoint.
>>
>> I'm wondering:
>> 1. How could I let Beam create only one DemoIO instance, or at least use
>> the same instance constantly?
>> 2. Or should I use the Source API for such purpose?
>>
>> Thanks in advance.
>>
>> Logs:
>> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
>> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
>> First->getInitialRestriction() is called!
>> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
>> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
>> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
>> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
>> 9223372036854775807)->newTracker() is called!
>> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>> called!
>> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
>> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
>> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
>> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
>> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:56.285Z -> 0 -> First
>> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
>> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
>> 2018-09-18T23:15:56.786Z -> 1 -> First
>> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
>> 9223372036854775807)->newTracker() is called!
>> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
>> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
>> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
>> called!
>> 07:15:58:358 [direct-runner-worker] [DEB

Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Tyler Akidau
Thanks Davor, and congrats Kenn!

-Tyler

On Wed, Sep 19, 2018 at 2:43 PM Yifan Zou  wrote:

> Congratulations Kenn!
>
> On Wed, Sep 19, 2018 at 2:36 PM Robert Burke  wrote:
>
>> Congrats Kenn! :D
>>
>> On Wed, Sep 19, 2018, 2:21 PM Ismaël Mejía  wrote:
>>
>>> Congratulations and welcome Kenn as new chair!
>>> Thanks Davor for your hard work too.
>>>
>>> On Wed, Sep 19, 2018 at 11:14 PM Rui Wang  wrote:
>>>
 Congrats!

 -Rui

 On Wed, Sep 19, 2018 at 2:12 PM Chamikara Jayalath <
 chamik...@google.com> wrote:

> Congrats!
>
> On Wed, Sep 19, 2018 at 2:05 PM Ahmet Altay  wrote:
>
>> Congratulations, Kenn! And thank you Davor.
>>
>> On Wed, Sep 19, 2018 at 1:44 PM, Anton Kedin 
>> wrote:
>>
>>> Congrats!
>>>
>>> On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka 
>>> wrote:
>>>
 Congrats Kenn!

 On Wed, Sep 19, 2018 at 1:35 PM Amit Sela 
 wrote:

> Well deserved! Congrats Kenn.
>
> On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang 
> wrote:
>
>> Congrats, Kenn!
>> ᐧ
>>
>> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
>> wrote:
>>
>>> Congrats, Kenn.
>>>
>>> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels <
>>> m...@apache.org> wrote:
>>>
 Congrats!

 On 19.09.18 22:07, Robin Qiu wrote:
 > Congratulations, Kenn!
 >
 > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik >>> > > wrote:
 >
 > Congrats Kenn.
 >
 > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci <
 da...@apache.org
 > > wrote:
 >
 > Hi everyone --
 > It is with great pleasure that I announce that at
 today's
 > meeting of the Foundation's Board of Directors, the
 Board has
 > appointed Kenneth Knowles as the second chair of the
 Apache Beam
 > project.
 >
 > Kenn has served on the PMC since its inception, and
 is very
 > active and effective in growing the community. His
 exemplary
 > posts have been cited in other projects. I'm super
 happy to have
 > Kenn accepted the nomination, and I'm confident that
 he'll serve
 > with distinction.
 >
 > As for myself, I'm not going anywhere. I'm still
 around and will
 > be as active as I have recently been. Thrilled to be
 able to
 > pass the baton to such a key member of this community
 and to
 > have less administrative work to do ;-).
 >
 > Please join me in welcoming Kenn to his new role, and
 I ask that
 > you support him as much as possible. As always,
 please let me
 > know if you have any questions.
 >
 > Davor
 >

>>>
>>


Re: Rethinking Timers as PCollections

2018-09-19 Thread Robert Bradshaw
On Wed, Sep 19, 2018 at 11:54 PM Lukasz Cwik  wrote:

>
> On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik  wrote:
>>
>>> *How does modelling a timer as a PCollection help the Beam model?*
>>>
>>> The largest concern was about how to model timers within Apache Beam
>>> that:
>>> 1) removed the need for the watermark hold that is typically accompanied
>>> with state/timer implementations
>>> 2) enabled the ability to set the explicit output time to be independent
>>> of the firing time for all timer specifications [1]
>>>
>>> I felt as though treating timers as a self-loop around the ParDo
>>> PTransform allowed us to use the natural definition of output watermark =
>>> min(all input watermarks) as a way to define how timers hold output and
>>> using windowed values that contained timers as a natural way to represent
>>> the output time to be independent of the firing time. The purpose of the
>>> PCollection right now is to store the representation of how timers are
>>> encoded. I suspect that at some point in time we will have different timer
>>> encodings.
>>>
>>
>> I agree that being able to separate the hold time from firing time of a
>> timer is important, but in retrospect don't think timers as PCollections is
>> the only (or most natural) way to represent that (in the model or in runner
>> implementations).
>>
> Can you go into more detail as to what your suggesting as the replacement
> and why you believe it fits the model more naturally since "state" doesn't
> have watermarks or produce output but timers can. I'm not disagreeing that
> timers as PCollections may not be a natural fit but I don't see them as
> state as well since "user state" doesn't produce output.
>

Yeah, timers are their own thing. They come in like data, but are written
out like state.

I guess the high level is that I think the beam graph should represent,
within reason, the user's model of what their pipeline is, and treating
timers as PCollections with this self-loop feels like an implementation
detail, and furthermore an implementation detail that no runner is actually
going to use to implement things. And (again, this is subjective) seems to
complicate both the reasoning about a pipeline and implementing its
execution over viewing the stateful/timely aspects of a DoFn as internal
details to the ParDo operation.



> Having this fit well with how timers are delivered between the SDK and
>>> Runner was an added bonus. Also, a good portion of the code that I needed
>>> to fix up was more related to the assumption that there was ever only a
>>> single input producer to an executable stage and plumbing of timer
>>> specifications through all the runner library support layers.
>>>
>>> --
>>> *There is no "clear" for timers.*
>>>
>>> The current Java API for timers only allows you to set them. Clearing
>>> timers is not exposed to users and is only used by internal implementations
>>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>>   @TimerId("timer")
>>>   private final TimerSpec timerSpec =
>>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>>   @ProcessElement
>>>   public void process(
>>>   ProcessContext context,
>>>   BoundedWindow window,
>>>   @TimerId("timer") Timer myTimer) {
>>>
>>> myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>>   }
>>>
>>
>> We'll probably want clear. But currently there's already exactly one
>> timer per window per key, and setting another one overwrites the previous
>> one, again making it more like state. Maybe, as you said, it could involve
>> retractions (but every output being a retraction seems odd.)
>>
> Once retractions exist, most GBK firings will have a preceding retraction
> so I believe they will be very common.
>

True, but I don't think we want to insert the GBK + CV in the graph to
represent the consolidation that's going on here.



> * side inputs already require a runner to introspect the ParDo payload to
>>> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
>>> no different.
>>>
>>
>> My point was that once it has knowelge of the TimerSpec, there is no need
>> for (meaning no additional information provided by) the timer PCollection
>> nor its edges.
>>
> The way in which the timer is encoded is missing. This could be explicit
> on the TimerSpec like the other StateSpec definitions though.
>

Ah, I didn't realize there was a choice in the matter.


On Wed, Sep 19, 2018 at 11:57 PM Reuven Lax  wrote:

>
>> We'll probably want clear. But currently there's already exactly one
>> timer per window per key, and setting another one overwrites the previous
>> one, again making it more like state. Maybe, as you said, it could involve
>> retractions (but every output being a retraction seems odd.)
>>
>
> This is not true. We support multiple (tagged) timers per key.
>

Yeah, I misspoke. I meant every distinct (tagged) timer has one firing
time, ra

Re: Rethinking Timers as PCollections

2018-09-19 Thread Reuven Lax
On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw  wrote:

> On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik  wrote:
>
>> *How does modelling a timer as a PCollection help the Beam model?*
>>
>> The largest concern was about how to model timers within Apache Beam that:
>> 1) removed the need for the watermark hold that is typically accompanied
>> with state/timer implementations
>> 2) enabled the ability to set the explicit output time to be independent
>> of the firing time for all timer specifications [1]
>>
>> I felt as though treating timers as a self-loop around the ParDo
>> PTransform allowed us to use the natural definition of output watermark =
>> min(all input watermarks) as a way to define how timers hold output and
>> using windowed values that contained timers as a natural way to represent
>> the output time to be independent of the firing time. The purpose of the
>> PCollection right now is to store the representation of how timers are
>> encoded. I suspect that at some point in time we will have different timer
>> encodings.
>>
>
> I agree that being able to separate the hold time from firing time of a
> timer is important, but in retrospect don't think timers as PCollections is
> the only (or most natural) way to represent that (in the model or in runner
> implementations).
>
>
>> Having this fit well with how timers are delivered between the SDK and
>> Runner was an added bonus. Also, a good portion of the code that I needed
>> to fix up was more related to the assumption that there was ever only a
>> single input producer to an executable stage and plumbing of timer
>> specifications through all the runner library support layers.
>>
>> --
>> *There is no "clear" for timers.*
>>
>> The current Java API for timers only allows you to set them. Clearing
>> timers is not exposed to users and is only used by internal implementations
>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>   @TimerId("timer")
>>   private final TimerSpec timerSpec =
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>   @ProcessElement
>>   public void process(
>>   ProcessContext context,
>>   BoundedWindow window,
>>   @TimerId("timer") Timer myTimer) {
>>
>> myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>   }
>>
>
> We'll probably want clear. But currently there's already exactly one timer
> per window per key, and setting another one overwrites the previous one,
> again making it more like state. Maybe, as you said, it could involve
> retractions (but every output being a retraction seems odd.)
>

This is not true. We support multiple (tagged) timers per key.


>
>
>>
>> -
>> I'm not a big fan of having timers as a separate field in the elements
>> proto. I still think they should be treated as an input/output and we could
>> update the representation so that inputs/outputs for PTransforms don't need
>> to be "PCollections". I was thinking that our current PCollection
>> representation assumes that we'll never want to change it to add extra
>> information or do backwards incompatible changes like beam:pcollection:v2.
>>
>
> If the data never travels from one PTransform to another, but always go
> directly to/from the runner harness, I think using explicit channels to
> communicate this information in the fn api makes more sense than
> complicating the graph with special types of PCollections. This is
> consistent with how we do side inputs and state, and I think more
> consistent with the DAG a user has in their head when writing a pipline.
> (And I could also see speculatively pushing state or side input information
> in the data channel too.)
>
> Especially when writing, they feel a lot more like they belong to state.
> And it could make sense to try to read unfired timers as well.
>
>
>>
>> -
>> Other points:
>> * side inputs already require a runner to introspect the ParDo payload to
>> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
>> no different.
>>
>
> My point was that once it has knowelge of the TimerSpec, there is no need
> for (meaning no additional information provided by) the timer PCollection
> nor its edges.
>
>
>> * multimap side input over timers where the key is the key that the timer
>> is associated with. iterable side input over timers would allow you to
>> iterate over  pairs. This could be useful for skew control in
>> sources since they would want to know how far they are ahead vs other
>> restrictions.
>> * user state as a PCollection can make sense but I can't see how we can
>> get past problems when we treat it as an "input" since the input watermark
>> would be ignored or infinity?. I do agree that this could open the door to
>> sharing "state" such as multi-key transactions but very speculative as you
>> say.
>>
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-2535
>> 2:
>> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.st

Re: Migrating Beam SQL to Calcite's code generation

2018-09-19 Thread Rui Wang
This is a so exciting change!

Since we cannot mix current implementation with Calcite code generation, is
there any case that Calcite code generation does not support but our
current implementation supports, so switching to Calcite code generation
will have some impact to existing usage?

-Rui

On Wed, Sep 19, 2018 at 11:53 AM Andrew Pilloud  wrote:

> To follow up on this, the PR is now in a reviewable state and I've added
> more tests for FLOOR and CEIL. Both work with a more extensive set of
> arguments after this change. There are now 4 outstanding calcite PRs that
> get all the tests passing.
>
> Unfortunately there is no easy way to mix our current implementation and
> using Calcite's code generator.
>
> Andrew
>
> On Mon, Sep 17, 2018 at 3:22 PM Mingmin Xu  wrote:
>
>> Awesome work, we should call Calcite operator functions if available.
>>
>> I haven't get time to read the PR yet, for those impacted would keep
>> existing implementation. One example is, I notice FLOOR/CEIL only supports
>> months/years recently which is quite a surprise to me.
>>
>> Mingmin
>>
>> On Mon, Sep 17, 2018 at 3:03 PM Anton Kedin  wrote:
>>
>>> This is pretty amazing! Thank you for doing this!
>>>
>>> Regards,
>>> Anton
>>>
>>> On Mon, Sep 17, 2018 at 2:27 PM Andrew Pilloud 
>>> wrote:
>>>
 I've adapted Calcite's EnumerableCalc code generation to generate the
 BeamCalc DoFn. The primary purpose behind this change is so we can take
 advantage of Calcite's extensive SQL operator implementation. This deletes
 ~11000 lines of code from Beam (with ~350 added), significantly increases
 the set of supported SQL operators, and improves performance and
 correctness of currently supported operators. Here is my work in progress:
 https://github.com/apache/beam/pull/6417

 There are a few bugs in Calcite that this has exposed:

 Fixed in Calcite master:

- CALCITE-2321 
- The type of a union of CHAR columns of different lengths should be 
 VARCHAR
- CALCITE-2447 
- Some POWER, ATAN2 functions fail with NoSuchMethodException

 Pending PRs:

- CALCITE-2529 
- linq4j should promote integer to floating point when generating 
 function
calls
- CALCITE-2530 
- TRIM function does not throw exception when the length of trim 
 character
is not 1(one)

 More work:

- CALCITE-2404 
- Accessing structured-types is not implemented by the runtime
- (none yet) - Support multi character TRIM extension in Calcite

 I would like to push these changes in with these minor regressions. Do
 any of these Calcite bugs block this functionality being adding to Beam?

 Andrew

>>>
>>
>> --
>> 
>> Mingmin
>>
>


Re: Rethinking Timers as PCollections

2018-09-19 Thread Lukasz Cwik
On Wed, Sep 19, 2018 at 2:46 PM Robert Bradshaw  wrote:

> On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik  wrote:
>
>> *How does modelling a timer as a PCollection help the Beam model?*
>>
>> The largest concern was about how to model timers within Apache Beam that:
>> 1) removed the need for the watermark hold that is typically accompanied
>> with state/timer implementations
>> 2) enabled the ability to set the explicit output time to be independent
>> of the firing time for all timer specifications [1]
>>
>> I felt as though treating timers as a self-loop around the ParDo
>> PTransform allowed us to use the natural definition of output watermark =
>> min(all input watermarks) as a way to define how timers hold output and
>> using windowed values that contained timers as a natural way to represent
>> the output time to be independent of the firing time. The purpose of the
>> PCollection right now is to store the representation of how timers are
>> encoded. I suspect that at some point in time we will have different timer
>> encodings.
>>
>
> I agree that being able to separate the hold time from firing time of a
> timer is important, but in retrospect don't think timers as PCollections is
> the only (or most natural) way to represent that (in the model or in runner
> implementations).
>
Can you go into more detail as to what your suggesting as the replacement
and why you believe it fits the model more naturally since "state" doesn't
have watermarks or produce output but timers can. I'm not disagreeing that
timers as PCollections may not be a natural fit but I don't see them as
state as well since "user state" doesn't produce output.


>
>
>> Having this fit well with how timers are delivered between the SDK and
>> Runner was an added bonus. Also, a good portion of the code that I needed
>> to fix up was more related to the assumption that there was ever only a
>> single input producer to an executable stage and plumbing of timer
>> specifications through all the runner library support layers.
>>
>> --
>> *There is no "clear" for timers.*
>>
>> The current Java API for timers only allows you to set them. Clearing
>> timers is not exposed to users and is only used by internal implementations
>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>   @TimerId("timer")
>>   private final TimerSpec timerSpec =
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>   @ProcessElement
>>   public void process(
>>   ProcessContext context,
>>   BoundedWindow window,
>>   @TimerId("timer") Timer myTimer) {
>>
>> myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>   }
>>
>
> We'll probably want clear. But currently there's already exactly one timer
> per window per key, and setting another one overwrites the previous one,
> again making it more like state. Maybe, as you said, it could involve
> retractions (but every output being a retraction seems odd.)
>
Once retractions exist, most GBK firings will have a preceding retraction
so I believe they will be very common.

>
>
>>
>> -
>> I'm not a big fan of having timers as a separate field in the elements
>> proto. I still think they should be treated as an input/output and we could
>> update the representation so that inputs/outputs for PTransforms don't need
>> to be "PCollections". I was thinking that our current PCollection
>> representation assumes that we'll never want to change it to add extra
>> information or do backwards incompatible changes like beam:pcollection:v2.
>>
>
> If the data never travels from one PTransform to another, but always go
> directly to/from the runner harness, I think using explicit channels to
> communicate this information in the fn api makes more sense than
> complicating the graph with special types of PCollections. This is
> consistent with how we do side inputs and state, and I think more
> consistent with the DAG a user has in their head when writing a pipline.
> (And I could also see speculatively pushing state or side input information
> in the data channel too.)
>
> Especially when writing, they feel a lot more like they belong to state.
> And it could make sense to try to read unfired timers as well.
>
>
>>
>> -
>> Other points:
>> * side inputs already require a runner to introspect the ParDo payload to
>> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
>> no different.
>>
>
> My point was that once it has knowelge of the TimerSpec, there is no need
> for (meaning no additional information provided by) the timer PCollection
> nor its edges.
>
The way in which the timer is encoded is missing. This could be explicit on
the TimerSpec like the other StateSpec definitions though.


>
>> * multimap side input over timers where the key is the key that the timer
>> is associated with. iterable side input over timers would allow you to
>> iterate over  pairs. This could be useful for skew control in
>> sources since they would want to know how far 

Re: Rethinking Timers as PCollections

2018-09-19 Thread Robert Bradshaw
On Wed, Sep 19, 2018 at 8:31 PM Lukasz Cwik  wrote:

> *How does modelling a timer as a PCollection help the Beam model?*
>
> The largest concern was about how to model timers within Apache Beam that:
> 1) removed the need for the watermark hold that is typically accompanied
> with state/timer implementations
> 2) enabled the ability to set the explicit output time to be independent
> of the firing time for all timer specifications [1]
>
> I felt as though treating timers as a self-loop around the ParDo
> PTransform allowed us to use the natural definition of output watermark =
> min(all input watermarks) as a way to define how timers hold output and
> using windowed values that contained timers as a natural way to represent
> the output time to be independent of the firing time. The purpose of the
> PCollection right now is to store the representation of how timers are
> encoded. I suspect that at some point in time we will have different timer
> encodings.
>

I agree that being able to separate the hold time from firing time of a
timer is important, but in retrospect don't think timers as PCollections is
the only (or most natural) way to represent that (in the model or in runner
implementations).


> Having this fit well with how timers are delivered between the SDK and
> Runner was an added bonus. Also, a good portion of the code that I needed
> to fix up was more related to the assumption that there was ever only a
> single input producer to an executable stage and plumbing of timer
> specifications through all the runner library support layers.
>
> --
> *There is no "clear" for timers.*
>
> The current Java API for timers only allows you to set them. Clearing
> timers is not exposed to users and is only used by internal implementations
> to support runners[2] via TimerInternals. Usage of a timer is like so:
>   @TimerId("timer")
>   private final TimerSpec timerSpec =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>   @ProcessElement
>   public void process(
>   ProcessContext context,
>   BoundedWindow window,
>   @TimerId("timer") Timer myTimer) {
>
> myTimer.set(window.maxTimestamp().plus(allowedLateness));
>   }
>

We'll probably want clear. But currently there's already exactly one timer
per window per key, and setting another one overwrites the previous one,
again making it more like state. Maybe, as you said, it could involve
retractions (but every output being a retraction seems odd.)


>
> -
> I'm not a big fan of having timers as a separate field in the elements
> proto. I still think they should be treated as an input/output and we could
> update the representation so that inputs/outputs for PTransforms don't need
> to be "PCollections". I was thinking that our current PCollection
> representation assumes that we'll never want to change it to add extra
> information or do backwards incompatible changes like beam:pcollection:v2.
>

If the data never travels from one PTransform to another, but always go
directly to/from the runner harness, I think using explicit channels to
communicate this information in the fn api makes more sense than
complicating the graph with special types of PCollections. This is
consistent with how we do side inputs and state, and I think more
consistent with the DAG a user has in their head when writing a pipline.
(And I could also see speculatively pushing state or side input information
in the data channel too.)

Especially when writing, they feel a lot more like they belong to state.
And it could make sense to try to read unfired timers as well.


>
> -
> Other points:
> * side inputs already require a runner to introspect the ParDo payload to
> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
> no different.
>

My point was that once it has knowelge of the TimerSpec, there is no need
for (meaning no additional information provided by) the timer PCollection
nor its edges.


> * multimap side input over timers where the key is the key that the timer
> is associated with. iterable side input over timers would allow you to
> iterate over  pairs. This could be useful for skew control in
> sources since they would want to know how far they are ahead vs other
> restrictions.
> * user state as a PCollection can make sense but I can't see how we can
> get past problems when we treat it as an "input" since the input watermark
> would be ignored or infinity?. I do agree that this could open the door to
> sharing "state" such as multi-key transactions but very speculative as you
> say.
>
>
> 1: https://issues.apache.org/jira/browse/BEAM-2535
> 2:
> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>
> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise  wrote:
>
>> Robert,
>>
>> Thanks for presenting these thoughts. Your attempt to implement the timer
>> support in the Python runner is the first strong signal we have and it is
>> the right ti

Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Yifan Zou
Congratulations Kenn!

On Wed, Sep 19, 2018 at 2:36 PM Robert Burke  wrote:

> Congrats Kenn! :D
>
> On Wed, Sep 19, 2018, 2:21 PM Ismaël Mejía  wrote:
>
>> Congratulations and welcome Kenn as new chair!
>> Thanks Davor for your hard work too.
>>
>> On Wed, Sep 19, 2018 at 11:14 PM Rui Wang  wrote:
>>
>>> Congrats!
>>>
>>> -Rui
>>>
>>> On Wed, Sep 19, 2018 at 2:12 PM Chamikara Jayalath 
>>> wrote:
>>>
 Congrats!

 On Wed, Sep 19, 2018 at 2:05 PM Ahmet Altay  wrote:

> Congratulations, Kenn! And thank you Davor.
>
> On Wed, Sep 19, 2018 at 1:44 PM, Anton Kedin  wrote:
>
>> Congrats!
>>
>> On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka 
>> wrote:
>>
>>> Congrats Kenn!
>>>
>>> On Wed, Sep 19, 2018 at 1:35 PM Amit Sela 
>>> wrote:
>>>
 Well deserved! Congrats Kenn.

 On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang 
 wrote:

> Congrats, Kenn!
> ᐧ
>
> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
> wrote:
>
>> Congrats, Kenn.
>>
>> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels <
>> m...@apache.org> wrote:
>>
>>> Congrats!
>>>
>>> On 19.09.18 22:07, Robin Qiu wrote:
>>> > Congratulations, Kenn!
>>> >
>>> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik >> > > wrote:
>>> >
>>> > Congrats Kenn.
>>> >
>>> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci <
>>> da...@apache.org
>>> > > wrote:
>>> >
>>> > Hi everyone --
>>> > It is with great pleasure that I announce that at
>>> today's
>>> > meeting of the Foundation's Board of Directors, the
>>> Board has
>>> > appointed Kenneth Knowles as the second chair of the
>>> Apache Beam
>>> > project.
>>> >
>>> > Kenn has served on the PMC since its inception, and is
>>> very
>>> > active and effective in growing the community. His
>>> exemplary
>>> > posts have been cited in other projects. I'm super
>>> happy to have
>>> > Kenn accepted the nomination, and I'm confident that
>>> he'll serve
>>> > with distinction.
>>> >
>>> > As for myself, I'm not going anywhere. I'm still
>>> around and will
>>> > be as active as I have recently been. Thrilled to be
>>> able to
>>> > pass the baton to such a key member of this community
>>> and to
>>> > have less administrative work to do ;-).
>>> >
>>> > Please join me in welcoming Kenn to his new role, and
>>> I ask that
>>> > you support him as much as possible. As always, please
>>> let me
>>> > know if you have any questions.
>>> >
>>> > Davor
>>> >
>>>
>>
>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Robert Burke
Congrats Kenn! :D

On Wed, Sep 19, 2018, 2:21 PM Ismaël Mejía  wrote:

> Congratulations and welcome Kenn as new chair!
> Thanks Davor for your hard work too.
>
> On Wed, Sep 19, 2018 at 11:14 PM Rui Wang  wrote:
>
>> Congrats!
>>
>> -Rui
>>
>> On Wed, Sep 19, 2018 at 2:12 PM Chamikara Jayalath 
>> wrote:
>>
>>> Congrats!
>>>
>>> On Wed, Sep 19, 2018 at 2:05 PM Ahmet Altay  wrote:
>>>
 Congratulations, Kenn! And thank you Davor.

 On Wed, Sep 19, 2018 at 1:44 PM, Anton Kedin  wrote:

> Congrats!
>
> On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka 
> wrote:
>
>> Congrats Kenn!
>>
>> On Wed, Sep 19, 2018 at 1:35 PM Amit Sela 
>> wrote:
>>
>>> Well deserved! Congrats Kenn.
>>>
>>> On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang 
>>> wrote:
>>>
 Congrats, Kenn!
 ᐧ

 On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
 wrote:

> Congrats, Kenn.
>
> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
> wrote:
>
>> Congrats!
>>
>> On 19.09.18 22:07, Robin Qiu wrote:
>> > Congratulations, Kenn!
>> >
>> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik > > > wrote:
>> >
>> > Congrats Kenn.
>> >
>> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci <
>> da...@apache.org
>> > > wrote:
>> >
>> > Hi everyone --
>> > It is with great pleasure that I announce that at
>> today's
>> > meeting of the Foundation's Board of Directors, the
>> Board has
>> > appointed Kenneth Knowles as the second chair of the
>> Apache Beam
>> > project.
>> >
>> > Kenn has served on the PMC since its inception, and is
>> very
>> > active and effective in growing the community. His
>> exemplary
>> > posts have been cited in other projects. I'm super
>> happy to have
>> > Kenn accepted the nomination, and I'm confident that
>> he'll serve
>> > with distinction.
>> >
>> > As for myself, I'm not going anywhere. I'm still around
>> and will
>> > be as active as I have recently been. Thrilled to be
>> able to
>> > pass the baton to such a key member of this community
>> and to
>> > have less administrative work to do ;-).
>> >
>> > Please join me in welcoming Kenn to his new role, and I
>> ask that
>> > you support him as much as possible. As always, please
>> let me
>> > know if you have any questions.
>> >
>> > Davor
>> >
>>
>



Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Lukasz Cwik
If we don't hear much from users, I would be for merging the change as long
as it is marked @Experimental until we get future feedback on its usage.

On Wed, Sep 19, 2018 at 2:19 PM Jeff Klukas  wrote:

> Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
> was hoping to get context on, since I don't yet have extensive experience
> with beam.
>
> I have not yet run into issues where the output coder was not able to be
> inferred. I expect this may be a non-issue, as the individual transforms
> used within a user-provided lambda expression would presumably expose the
> ability to specify a coder.
>
> I don't have enough context yet to comment on whether display data might
> be an issue, so I do hope the user list can provide input there.
>
> On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik  wrote:
>
>> Thanks for the proposal and it does seem to make the API cleaner to build
>> anonymous composite transforms.
>>
>> In your experience have you had issues where the API doesn't work out
>> well because the PTransform:
>> * is not able to override how the output coder is inferred?
>> * can't supply display data?
>>
>> +u...@beam.apache.org , do users think that the
>> provided API would be useful enough for it to be added to the core SDK or
>> would the addition of the method provide noise/detract from the existing
>> API?
>>
>> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas  wrote:
>>
>>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on
>>> this suggestion and make it more concrete:
>>>
>>> https://issues.apache.org/jira/browse/BEAM-5413
>>> https://github.com/apache/beam/pull/6414
>>>
>>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:
>>>
 Hello all, I'm a data engineer at Mozilla working on a first project
 using Beam. I've been impressed with the usability of the API as there are
 good built-in solutions for handling many simple transformation cases with
 minimal code, and wanted to discuss one bit of ergonomics that seems to be
 missing.

 It appears that none of the existing PTransform factories are generic
 enough to take in or output a PCollectionTuple, but we've found many use
 cases where it's convenient to apply a few transforms on a PCollectionTuple
 in a lambda expression.

 For example, we've defined several PTransforms that return main and
 error output stream bundled in a PCollectionTuple. We defined a
 CompositeTransform interface so that we could handle the error output in a
 lambda expression like:

 pipeline
 .apply("attempt to deserialize messages", new
 MyDeserializationTransform())
 .apply("write deserialization errors",
 CompositeTransform.of((PCollectionTuple input) -> {
 input.get(errorTag).apply(new MyErrorOutputTransform())
 return input.get(mainTag);
 })
 .apply("more processing on the deserialized messages", new
 MyOtherTransform())

 I'd be interested in contributing a patch to add this functionality,
 perhaps as a static method PTransform.compose(). Would that patch be
 welcome? Are there other thoughts on naming?

 The full code of the CompositeTransform interface we're currently using
 is included below.


 public interface CompositeTransform>>> extends POutput> {
   OutputT expand(InputT input);

   /**
* The public factory method that serves as the entrypoint for users
 to create a composite PTransform.
*/
   static 
 PTransform of(CompositeTransform>>> OutputT> transform) {
 return new PTransform() {
   @Override
   public OutputT expand(InputT input) {
 return transform.expand(input);
   }
 };
   }
 }






Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Ismaël Mejía
Congratulations and welcome Kenn as new chair!
Thanks Davor for your hard work too.

On Wed, Sep 19, 2018 at 11:14 PM Rui Wang  wrote:

> Congrats!
>
> -Rui
>
> On Wed, Sep 19, 2018 at 2:12 PM Chamikara Jayalath 
> wrote:
>
>> Congrats!
>>
>> On Wed, Sep 19, 2018 at 2:05 PM Ahmet Altay  wrote:
>>
>>> Congratulations, Kenn! And thank you Davor.
>>>
>>> On Wed, Sep 19, 2018 at 1:44 PM, Anton Kedin  wrote:
>>>
 Congrats!

 On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka  wrote:

> Congrats Kenn!
>
> On Wed, Sep 19, 2018 at 1:35 PM Amit Sela 
> wrote:
>
>> Well deserved! Congrats Kenn.
>>
>> On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang  wrote:
>>
>>> Congrats, Kenn!
>>> ᐧ
>>>
>>> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
>>> wrote:
>>>
 Congrats, Kenn.

 On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
 wrote:

> Congrats!
>
> On 19.09.18 22:07, Robin Qiu wrote:
> > Congratulations, Kenn!
> >
> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik  > > wrote:
> >
> > Congrats Kenn.
> >
> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci <
> da...@apache.org
> > > wrote:
> >
> > Hi everyone --
> > It is with great pleasure that I announce that at today's
> > meeting of the Foundation's Board of Directors, the
> Board has
> > appointed Kenneth Knowles as the second chair of the
> Apache Beam
> > project.
> >
> > Kenn has served on the PMC since its inception, and is
> very
> > active and effective in growing the community. His
> exemplary
> > posts have been cited in other projects. I'm super happy
> to have
> > Kenn accepted the nomination, and I'm confident that
> he'll serve
> > with distinction.
> >
> > As for myself, I'm not going anywhere. I'm still around
> and will
> > be as active as I have recently been. Thrilled to be
> able to
> > pass the baton to such a key member of this community
> and to
> > have less administrative work to do ;-).
> >
> > Please join me in welcoming Kenn to his new role, and I
> ask that
> > you support him as much as possible. As always, please
> let me
> > know if you have any questions.
> >
> > Davor
> >
>

>>>


Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Jeff Klukas
Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
was hoping to get context on, since I don't yet have extensive experience
with beam.

I have not yet run into issues where the output coder was not able to be
inferred. I expect this may be a non-issue, as the individual transforms
used within a user-provided lambda expression would presumably expose the
ability to specify a coder.

I don't have enough context yet to comment on whether display data might be
an issue, so I do hope the user list can provide input there.

On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik  wrote:

> Thanks for the proposal and it does seem to make the API cleaner to build
> anonymous composite transforms.
>
> In your experience have you had issues where the API doesn't work out well
> because the PTransform:
> * is not able to override how the output coder is inferred?
> * can't supply display data?
>
> +u...@beam.apache.org , do users think that the
> provided API would be useful enough for it to be added to the core SDK or
> would the addition of the method provide noise/detract from the existing
> API?
>
> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas  wrote:
>
>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
>> suggestion and make it more concrete:
>>
>> https://issues.apache.org/jira/browse/BEAM-5413
>> https://github.com/apache/beam/pull/6414
>>
>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:
>>
>>> Hello all, I'm a data engineer at Mozilla working on a first project
>>> using Beam. I've been impressed with the usability of the API as there are
>>> good built-in solutions for handling many simple transformation cases with
>>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>>> missing.
>>>
>>> It appears that none of the existing PTransform factories are generic
>>> enough to take in or output a PCollectionTuple, but we've found many use
>>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>>> in a lambda expression.
>>>
>>> For example, we've defined several PTransforms that return main and
>>> error output stream bundled in a PCollectionTuple. We defined a
>>> CompositeTransform interface so that we could handle the error output in a
>>> lambda expression like:
>>>
>>> pipeline
>>> .apply("attempt to deserialize messages", new
>>> MyDeserializationTransform())
>>> .apply("write deserialization errors",
>>> CompositeTransform.of((PCollectionTuple input) -> {
>>> input.get(errorTag).apply(new MyErrorOutputTransform())
>>> return input.get(mainTag);
>>> })
>>> .apply("more processing on the deserialized messages", new
>>> MyOtherTransform())
>>>
>>> I'd be interested in contributing a patch to add this functionality,
>>> perhaps as a static method PTransform.compose(). Would that patch be
>>> welcome? Are there other thoughts on naming?
>>>
>>> The full code of the CompositeTransform interface we're currently using
>>> is included below.
>>>
>>>
>>> public interface CompositeTransform>> extends POutput> {
>>>   OutputT expand(InputT input);
>>>
>>>   /**
>>>* The public factory method that serves as the entrypoint for users
>>> to create a composite PTransform.
>>>*/
>>>   static 
>>> PTransform of(CompositeTransform>> OutputT> transform) {
>>> return new PTransform() {
>>>   @Override
>>>   public OutputT expand(InputT input) {
>>> return transform.expand(input);
>>>   }
>>> };
>>>   }
>>> }
>>>
>>>
>>>
>>>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Rui Wang
Congrats!

-Rui

On Wed, Sep 19, 2018 at 2:12 PM Chamikara Jayalath 
wrote:

> Congrats!
>
> On Wed, Sep 19, 2018 at 2:05 PM Ahmet Altay  wrote:
>
>> Congratulations, Kenn! And thank you Davor.
>>
>> On Wed, Sep 19, 2018 at 1:44 PM, Anton Kedin  wrote:
>>
>>> Congrats!
>>>
>>> On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka  wrote:
>>>
 Congrats Kenn!

 On Wed, Sep 19, 2018 at 1:35 PM Amit Sela  wrote:

> Well deserved! Congrats Kenn.
>
> On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang  wrote:
>
>> Congrats, Kenn!
>> ᐧ
>>
>> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
>> wrote:
>>
>>> Congrats, Kenn.
>>>
>>> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
>>> wrote:
>>>
 Congrats!

 On 19.09.18 22:07, Robin Qiu wrote:
 > Congratulations, Kenn!
 >
 > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik >>> > > wrote:
 >
 > Congrats Kenn.
 >
 > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci <
 da...@apache.org
 > > wrote:
 >
 > Hi everyone --
 > It is with great pleasure that I announce that at today's
 > meeting of the Foundation's Board of Directors, the Board
 has
 > appointed Kenneth Knowles as the second chair of the
 Apache Beam
 > project.
 >
 > Kenn has served on the PMC since its inception, and is
 very
 > active and effective in growing the community. His
 exemplary
 > posts have been cited in other projects. I'm super happy
 to have
 > Kenn accepted the nomination, and I'm confident that
 he'll serve
 > with distinction.
 >
 > As for myself, I'm not going anywhere. I'm still around
 and will
 > be as active as I have recently been. Thrilled to be able
 to
 > pass the baton to such a key member of this community and
 to
 > have less administrative work to do ;-).
 >
 > Please join me in welcoming Kenn to his new role, and I
 ask that
 > you support him as much as possible. As always, please
 let me
 > know if you have any questions.
 >
 > Davor
 >

>>>
>>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Chamikara Jayalath
Congrats!

On Wed, Sep 19, 2018 at 2:05 PM Ahmet Altay  wrote:

> Congratulations, Kenn! And thank you Davor.
>
> On Wed, Sep 19, 2018 at 1:44 PM, Anton Kedin  wrote:
>
>> Congrats!
>>
>> On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka  wrote:
>>
>>> Congrats Kenn!
>>>
>>> On Wed, Sep 19, 2018 at 1:35 PM Amit Sela  wrote:
>>>
 Well deserved! Congrats Kenn.

 On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang  wrote:

> Congrats, Kenn!
> ᐧ
>
> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
> wrote:
>
>> Congrats, Kenn.
>>
>> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
>> wrote:
>>
>>> Congrats!
>>>
>>> On 19.09.18 22:07, Robin Qiu wrote:
>>> > Congratulations, Kenn!
>>> >
>>> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik >> > > wrote:
>>> >
>>> > Congrats Kenn.
>>> >
>>> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci <
>>> da...@apache.org
>>> > > wrote:
>>> >
>>> > Hi everyone --
>>> > It is with great pleasure that I announce that at today's
>>> > meeting of the Foundation's Board of Directors, the Board
>>> has
>>> > appointed Kenneth Knowles as the second chair of the
>>> Apache Beam
>>> > project.
>>> >
>>> > Kenn has served on the PMC since its inception, and is very
>>> > active and effective in growing the community. His
>>> exemplary
>>> > posts have been cited in other projects. I'm super happy
>>> to have
>>> > Kenn accepted the nomination, and I'm confident that he'll
>>> serve
>>> > with distinction.
>>> >
>>> > As for myself, I'm not going anywhere. I'm still around
>>> and will
>>> > be as active as I have recently been. Thrilled to be able
>>> to
>>> > pass the baton to such a key member of this community and
>>> to
>>> > have less administrative work to do ;-).
>>> >
>>> > Please join me in welcoming Kenn to his new role, and I
>>> ask that
>>> > you support him as much as possible. As always, please let
>>> me
>>> > know if you have any questions.
>>> >
>>> > Davor
>>> >
>>>
>>
>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Ahmet Altay
Congratulations, Kenn! And thank you Davor.

On Wed, Sep 19, 2018 at 1:44 PM, Anton Kedin  wrote:

> Congrats!
>
> On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka  wrote:
>
>> Congrats Kenn!
>>
>> On Wed, Sep 19, 2018 at 1:35 PM Amit Sela  wrote:
>>
>>> Well deserved! Congrats Kenn.
>>>
>>> On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang  wrote:
>>>
 Congrats, Kenn!
 ᐧ

 On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
 wrote:

> Congrats, Kenn.
>
> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
> wrote:
>
>> Congrats!
>>
>> On 19.09.18 22:07, Robin Qiu wrote:
>> > Congratulations, Kenn!
>> >
>> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik > > > wrote:
>> >
>> > Congrats Kenn.
>> >
>> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci > > > wrote:
>> >
>> > Hi everyone --
>> > It is with great pleasure that I announce that at today's
>> > meeting of the Foundation's Board of Directors, the Board
>> has
>> > appointed Kenneth Knowles as the second chair of the Apache
>> Beam
>> > project.
>> >
>> > Kenn has served on the PMC since its inception, and is very
>> > active and effective in growing the community. His exemplary
>> > posts have been cited in other projects. I'm super happy to
>> have
>> > Kenn accepted the nomination, and I'm confident that he'll
>> serve
>> > with distinction.
>> >
>> > As for myself, I'm not going anywhere. I'm still around and
>> will
>> > be as active as I have recently been. Thrilled to be able to
>> > pass the baton to such a key member of this community and to
>> > have less administrative work to do ;-).
>> >
>> > Please join me in welcoming Kenn to his new role, and I ask
>> that
>> > you support him as much as possible. As always, please let
>> me
>> > know if you have any questions.
>> >
>> > Davor
>> >
>>
>


Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Lukasz Cwik
Thanks for the proposal and it does seem to make the API cleaner to build
anonymous composite transforms.

In your experience have you had issues where the API doesn't work out well
because the PTransform:
* is not able to override how the output coder is inferred?
* can't supply display data?

+u...@beam.apache.org , do users think that the
provided API would be useful enough for it to be added to the core SDK or
would the addition of the method provide noise/detract from the existing
API?

On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas  wrote:

> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
> suggestion and make it more concrete:
>
> https://issues.apache.org/jira/browse/BEAM-5413
> https://github.com/apache/beam/pull/6414
>
> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:
>
>> Hello all, I'm a data engineer at Mozilla working on a first project
>> using Beam. I've been impressed with the usability of the API as there are
>> good built-in solutions for handling many simple transformation cases with
>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>> missing.
>>
>> It appears that none of the existing PTransform factories are generic
>> enough to take in or output a PCollectionTuple, but we've found many use
>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>> in a lambda expression.
>>
>> For example, we've defined several PTransforms that return main and error
>> output stream bundled in a PCollectionTuple. We defined a
>> CompositeTransform interface so that we could handle the error output in a
>> lambda expression like:
>>
>> pipeline
>> .apply("attempt to deserialize messages", new
>> MyDeserializationTransform())
>> .apply("write deserialization errors",
>> CompositeTransform.of((PCollectionTuple input) -> {
>> input.get(errorTag).apply(new MyErrorOutputTransform())
>> return input.get(mainTag);
>> })
>> .apply("more processing on the deserialized messages", new
>> MyOtherTransform())
>>
>> I'd be interested in contributing a patch to add this functionality,
>> perhaps as a static method PTransform.compose(). Would that patch be
>> welcome? Are there other thoughts on naming?
>>
>> The full code of the CompositeTransform interface we're currently using
>> is included below.
>>
>>
>> public interface CompositeTransform> extends POutput> {
>>   OutputT expand(InputT input);
>>
>>   /**
>>* The public factory method that serves as the entrypoint for users to
>> create a composite PTransform.
>>*/
>>   static 
>> PTransform of(CompositeTransform> OutputT> transform) {
>> return new PTransform() {
>>   @Override
>>   public OutputT expand(InputT input) {
>> return transform.expand(input);
>>   }
>> };
>>   }
>> }
>>
>>
>>
>>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Anton Kedin
Congrats!

On Wed, Sep 19, 2018 at 1:36 PM Ankur Goenka  wrote:

> Congrats Kenn!
>
> On Wed, Sep 19, 2018 at 1:35 PM Amit Sela  wrote:
>
>> Well deserved! Congrats Kenn.
>>
>> On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang  wrote:
>>
>>> Congrats, Kenn!
>>> ᐧ
>>>
>>> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold 
>>> wrote:
>>>
 Congrats, Kenn.

 On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
 wrote:

> Congrats!
>
> On 19.09.18 22:07, Robin Qiu wrote:
> > Congratulations, Kenn!
> >
> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik  > > wrote:
> >
> > Congrats Kenn.
> >
> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci  > > wrote:
> >
> > Hi everyone --
> > It is with great pleasure that I announce that at today's
> > meeting of the Foundation's Board of Directors, the Board has
> > appointed Kenneth Knowles as the second chair of the Apache
> Beam
> > project.
> >
> > Kenn has served on the PMC since its inception, and is very
> > active and effective in growing the community. His exemplary
> > posts have been cited in other projects. I'm super happy to
> have
> > Kenn accepted the nomination, and I'm confident that he'll
> serve
> > with distinction.
> >
> > As for myself, I'm not going anywhere. I'm still around and
> will
> > be as active as I have recently been. Thrilled to be able to
> > pass the baton to such a key member of this community and to
> > have less administrative work to do ;-).
> >
> > Please join me in welcoming Kenn to his new role, and I ask
> that
> > you support him as much as possible. As always, please let me
> > know if you have any questions.
> >
> > Davor
> >
>



Re: Add ruoyun into contributor list?

2018-09-19 Thread Ruoyun Huang
Thanks Ismaël!  :-)

On Wed, Sep 19, 2018 at 1:29 PM Ismaël Mejía  wrote:

> Done
> On Wed, Sep 19, 2018 at 10:17 PM ruo...@google.com 
> wrote:
> >
> > Hi, Folks,
> >
> >  Can some one add me as contributor in the Beam issue tracker?
> >
> >  account name in JIRA:  ruoyun
> >
> > Thanks!
>


-- 

Ruoyun  Huang


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Ankur Goenka
Congrats Kenn!

On Wed, Sep 19, 2018 at 1:35 PM Amit Sela  wrote:

> Well deserved! Congrats Kenn.
>
> On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang  wrote:
>
>> Congrats, Kenn!
>> ᐧ
>>
>> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold  wrote:
>>
>>> Congrats, Kenn.
>>>
>>> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
>>> wrote:
>>>
 Congrats!

 On 19.09.18 22:07, Robin Qiu wrote:
 > Congratulations, Kenn!
 >
 > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik >>> > > wrote:
 >
 > Congrats Kenn.
 >
 > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci >>> > > wrote:
 >
 > Hi everyone --
 > It is with great pleasure that I announce that at today's
 > meeting of the Foundation's Board of Directors, the Board has
 > appointed Kenneth Knowles as the second chair of the Apache
 Beam
 > project.
 >
 > Kenn has served on the PMC since its inception, and is very
 > active and effective in growing the community. His exemplary
 > posts have been cited in other projects. I'm super happy to
 have
 > Kenn accepted the nomination, and I'm confident that he'll
 serve
 > with distinction.
 >
 > As for myself, I'm not going anywhere. I'm still around and
 will
 > be as active as I have recently been. Thrilled to be able to
 > pass the baton to such a key member of this community and to
 > have less administrative work to do ;-).
 >
 > Please join me in welcoming Kenn to his new role, and I ask
 that
 > you support him as much as possible. As always, please let me
 > know if you have any questions.
 >
 > Davor
 >

>>>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Amit Sela
Well deserved! Congrats Kenn.

On Wed, Sep 19, 2018 at 4:25 PM Kai Jiang  wrote:

> Congrats, Kenn!
> ᐧ
>
> On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold  wrote:
>
>> Congrats, Kenn.
>>
>> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels 
>> wrote:
>>
>>> Congrats!
>>>
>>> On 19.09.18 22:07, Robin Qiu wrote:
>>> > Congratulations, Kenn!
>>> >
>>> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik >> > > wrote:
>>> >
>>> > Congrats Kenn.
>>> >
>>> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci >> > > wrote:
>>> >
>>> > Hi everyone --
>>> > It is with great pleasure that I announce that at today's
>>> > meeting of the Foundation's Board of Directors, the Board has
>>> > appointed Kenneth Knowles as the second chair of the Apache
>>> Beam
>>> > project.
>>> >
>>> > Kenn has served on the PMC since its inception, and is very
>>> > active and effective in growing the community. His exemplary
>>> > posts have been cited in other projects. I'm super happy to
>>> have
>>> > Kenn accepted the nomination, and I'm confident that he'll
>>> serve
>>> > with distinction.
>>> >
>>> > As for myself, I'm not going anywhere. I'm still around and
>>> will
>>> > be as active as I have recently been. Thrilled to be able to
>>> > pass the baton to such a key member of this community and to
>>> > have less administrative work to do ;-).
>>> >
>>> > Please join me in welcoming Kenn to his new role, and I ask
>>> that
>>> > you support him as much as possible. As always, please let me
>>> > know if you have any questions.
>>> >
>>> > Davor
>>> >
>>>
>>


Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Maximilian Michels
I tend to agree with you Lukasz. Of course we should try to follow the 
guide lines as much as possible but if it requires an extra back and 
forth with the PR author for a cosmetic change, it may not be worth the 
time.


On 19.09.18 22:17, Lukasz Cwik wrote:
I have to say I'm guilty of not following the merge guidelines, 
sometimes doing merges without rebasing/flatten commits.


I find that it is a few extra mins of my time to fix someones PR history 
if they have more then one logical commit they want to be separate and 
it usually takes days for the PR author to do merging  with the extra 
burden as a committer to keep track of another PR and its state (waiting 
for clean-up) is taxing. I really liked the idea of the mergebot (even 
though it didn't work out in practice) because it could do all the 
policy work on my behalf.


Anything that reduces my overhead as a committer is useful as for the 
100s of PRs that I have merged, I've only had to rollback a couple so 
I'm for Charle's suggestion which makes the rollback flow slightly more 
complicated for a significantly easier PR merge workflow.


On Wed, Sep 19, 2018 at 1:13 PM Charles Chen > wrote:


What I mean is that if you get the first-parent commit using "git
log --first-parent", it will incorporate any and all fix up commits
so we don't need to worry about missing any.

On Wed, Sep 19, 2018, 1:07 PM Maximilian Michels mailto:m...@apache.org>> wrote:

Generally, +1 for isolated commits which are easy to revert.

 > I don't think it's actually harder to roll back a set of
commits that are merged together.
I think Thomas was mainly concerned about "fixup" commits to
land in
master (as part of a merge). These indeed make reverting commits
more
difficult because you have to check whether you missed a "fixup".

 > Ideally every commit should compile and pass tests though, right?

That is definitely what we should strive for when doing a merge
against
master.

 > Perhaps the bigger issue is that we need better documentation
and a playbook on how to do this these common tasks in git.

We do actually have basic documentation about this but most
people don't
read it. For example, the commit message of a Merge commit
should be:

Merge pull request #: [BEAM-] Issue title

But most merge commits don't comply with this rule :) See
https://beam.apache.org/contribute/committer-guide/#merging-it

On 19.09.18 21:34, Reuven Lax wrote:
 > Ideally every commit should compile and pass tests though, right?
 >
 > On Wed, Sep 19, 2018 at 12:15 PM Ankur Goenka
mailto:goe...@google.com>
 > >> wrote:
 >
 >     I agree with the cleanliness of the Commit history.
 >     "Fixup!", "Address comments", "Address even more
comments" type of
 >     comments does not convey meaningful information and are
not very
 >     useful. Its a good idea to squash them.
 >     However, I think its ok to keep separate commits for
different
 >     logical pieces of the code which make reviewing and
revisiting code
 >     easier.
 >     Example PR: Support X in the pipeline
 >     Commit 1: Restructuring a bunch of code without any
logical change.
 >     Commit 2: Changing validation logic for pipeline.
 >     Commit 3: Supporting new field "X" for pipeline.
 >
 >     On Wed, Sep 19, 2018 at 11:27 AM Charles Chen
mailto:c...@google.com>
 >     >> wrote:
 >
 >         To be concrete, it is very easy to revert a commit in
any case:
 >
 >          1. First, use "git log --first-parent" to find the
first-parent
 >             commit corresponding to a PR merge (this is a
one-to-one
 >             correspondence).
 >          2. Use "git revert -m 1 " to revert the
commit; this
 >             selects the first parent as the base for a merge
commit (in
 >             the case where a single commit needs to be
reverted, just
 >             use "git revert " without the "-m 1" flag).
 >
 >         In any case, as a general good engineering practice,
I do agree
 >         that it is highly desirable to have small independent PRs
 >         instead of large jumbo PRs whenever possible.
 >
 >         On Wed, Sep 19, 2018 at 11:20 AM Charles Chen
mailto:c...@google.com>
 >         >> wrote:
 >
 >             I don't think it

Re: Add ruoyun into contributor list?

2018-09-19 Thread Ismaël Mejía
Done
On Wed, Sep 19, 2018 at 10:17 PM ruo...@google.com  wrote:
>
> Hi, Folks,
>
>  Can some one add me as contributor in the Beam issue tracker?
>
>  account name in JIRA:  ruoyun
>
> Thanks!


Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Maximilian Michels
I was also thinking about the possibility of wanting to revert 
individual commits from a merge commit. The solution you propose works, 
but only if you want to revert everything.


On 19.09.18 22:12, Charles Chen wrote:
What I mean is that if you get the first-parent commit using "git log 
--first-parent", it will incorporate any and all fix up commits so we 
don't need to worry about missing any.


On Wed, Sep 19, 2018, 1:07 PM Maximilian Michels > wrote:


Generally, +1 for isolated commits which are easy to revert.

 > I don't think it's actually harder to roll back a set of commits
that are merged together.
I think Thomas was mainly concerned about "fixup" commits to land in
master (as part of a merge). These indeed make reverting commits more
difficult because you have to check whether you missed a "fixup".

 > Ideally every commit should compile and pass tests though, right?

That is definitely what we should strive for when doing a merge against
master.

 > Perhaps the bigger issue is that we need better documentation and
a playbook on how to do this these common tasks in git.

We do actually have basic documentation about this but most people
don't
read it. For example, the commit message of a Merge commit should be:

Merge pull request #: [BEAM-] Issue title

But most merge commits don't comply with this rule :) See
https://beam.apache.org/contribute/committer-guide/#merging-it

On 19.09.18 21:34, Reuven Lax wrote:
 > Ideally every commit should compile and pass tests though, right?
 >
 > On Wed, Sep 19, 2018 at 12:15 PM Ankur Goenka mailto:goe...@google.com>
 > >> wrote:
 >
 >     I agree with the cleanliness of the Commit history.
 >     "Fixup!", "Address comments", "Address even more comments"
type of
 >     comments does not convey meaningful information and are not very
 >     useful. Its a good idea to squash them.
 >     However, I think its ok to keep separate commits for different
 >     logical pieces of the code which make reviewing and
revisiting code
 >     easier.
 >     Example PR: Support X in the pipeline
 >     Commit 1: Restructuring a bunch of code without any logical
change.
 >     Commit 2: Changing validation logic for pipeline.
 >     Commit 3: Supporting new field "X" for pipeline.
 >
 >     On Wed, Sep 19, 2018 at 11:27 AM Charles Chen mailto:c...@google.com>
 >     >> wrote:
 >
 >         To be concrete, it is very easy to revert a commit in any
case:
 >
 >          1. First, use "git log --first-parent" to find the
first-parent
 >             commit corresponding to a PR merge (this is a one-to-one
 >             correspondence).
 >          2. Use "git revert -m 1 " to revert the
commit; this
 >             selects the first parent as the base for a merge
commit (in
 >             the case where a single commit needs to be reverted, just
 >             use "git revert " without the "-m 1" flag).
 >
 >         In any case, as a general good engineering practice, I do
agree
 >         that it is highly desirable to have small independent PRs
 >         instead of large jumbo PRs whenever possible.
 >
 >         On Wed, Sep 19, 2018 at 11:20 AM Charles Chen
mailto:c...@google.com>
 >         >> wrote:
 >
 >             I don't think it's actually harder to roll back a set of
 >             commits that are merged together.  Git has the notion of
 >             first-parent commits (you can see, for example, "git log
 >             --first-parent", which filters out the intermediate
 >             commits).  In this sense, PRs still get merged as one
unit
 >             and this is preserved even if intermediate commits are
 >             kept.  Perhaps the bigger issue is that we need better
 >             documentation and a playbook on how to do this these
common
 >             tasks in git.
 >
 >             On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise
mailto:t...@apache.org>
 >             >> wrote:
 >
 >                 Wanted to bring this up as reminder as well as
 >                 opportunity to discuss potential changes to our
 >                 committer guide. It has been a while since last
related
 >                 discussion and we welcomed several new committers
since
 >                 then.
 >
 >                 Finishing up pull requests pre-merge:
 >
 > https://beam.apache.org/contribute/committer-guide/#finishing-touches
 >
 >                 PRs are worked on 

Re: Is Splittable DoFn suitable for fetch data from a socket server?

2018-09-19 Thread Lukasz Cwik
Before getting into what you could use and the current state of
SplittableDoFn and its supported features, I was wondering what reliability
guarantees does the socket server have around messages?

Is receiving and processing each message reliably important or is it ok to
drop messages when things fail?
Is there a message acknowledgement system or can you request a position
within the message stream (e.g. send all messages from position X when
connecting and if for whatever reason you need to reconnect you can say
send messages from position X to replay past messages)?




On Tue, Sep 18, 2018 at 5:00 PM flyisland  wrote:

>
> Hi Gurus,
>
> I'm trying to create an IO connector to fetch data from a socket server
> from Beam, I'm new to Beam, but according to this blog <
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html>, it seems
> that SDF is the recommended way to implement an IO connector now.
>
> This in-house built socket server could accept multiple clients, but only
> send messages to the first-connected client, and will send messages to the
> second client if the first one disconnected.
>
> To understand the lifecycle of a DoFn, I've just created a very simple
> DoFn subclass, call log.debug() in every method, and according to the
> JavaDoc of DoFn.Setup(), "This is a good place to initialize transient
> in-memory resources, such as network connections. The resources can then be
> disposed in DoFn.Teardown." I guess I should create the connection to the
> socket server in the setup() method.
>
> But based on the log messages below, even the input PCollection has only
> one element, Beam will still create more multiple DemoIO instances and
> invoked a different DemoIO instance after every checkpoint.
>
> I'm wondering:
> 1. How could I let Beam create only one DemoIO instance, or at least use
> the same instance constantly?
> 2. Or should I use the Source API for such purpose?
>
> Thanks in advance.
>
> Logs:
> 07:15:55:586 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@60a58077->setup() is called!
> 07:15:55:624 [direct-runner-worker] [DEBUG] DemoIO -
> First->getInitialRestriction() is called!
> 07:15:55:641 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@417eede1->setup() is called!
> 07:15:55:711 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->setup() is called!
> 07:15:55:714 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->startBundle() is called!
> 07:15:55:775 [direct-runner-worker] [DEBUG] DemoIO - [0,
> 9223372036854775807)->newTracker() is called!
> 07:15:55:779 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
> called!
> 07:15:56:787 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->process(OffsetRangeTracker{range=[0,
> 2), lastClaimedOffset=1, lastAttemptedOffset=2}) end!
> 07:15:56:801 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@2aa2413a->finishBundle() is called!
> 07:15:56:841 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->setup() is called!
> 07:15:56:842 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:56.285Z -> 0 -> First
> 07:15:56:843 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->startBundle() is called!
> 07:15:56:845 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:56.786Z -> 1 -> First
> 07:15:56:848 [direct-runner-worker] [DEBUG] DemoIO - [2,
> 9223372036854775807)->newTracker() is called!
> 07:15:56:850 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
> 9223372036854775807), lastClaimedOffset=null, lastAttemptedOffset=null}) is
> called!
> 07:15:58:358 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->process(OffsetRangeTracker{range=[2,
> 5), lastClaimedOffset=4, lastAttemptedOffset=5}) end!
> 07:15:58:361 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@30c7fe55->finishBundle() is called!
> 07:15:58:366 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:57.354Z -> 2 -> First
> 07:15:58:367 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->setup() is called!
> 07:15:58:369 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:57.856Z -> 3 -> First
> 07:15:58:369 [direct-runner-worker] [DEBUG] DemoIO -
> org.apache.beam.examples.DemoIO@142109e->startBundle() is called!
> 07:15:58:371 [direct-runner-worker] [DEBUG] WindowedWordCountSDF -
> 2018-09-18T23:15:58.358Z -> 4 -> First
> 07:15:58:373 [direct-runner-worker] [DEBUG] DemoIO - [5,
> 9223372036854775807)->newTracker() is called!
> 07:15:58:375 [direct-ru

Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Kai Jiang
Congrats, Kenn!
ᐧ

On Wed, Sep 19, 2018 at 1:23 PM Alan Myrvold  wrote:

> Congrats, Kenn.
>
> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels  wrote:
>
>> Congrats!
>>
>> On 19.09.18 22:07, Robin Qiu wrote:
>> > Congratulations, Kenn!
>> >
>> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik > > > wrote:
>> >
>> > Congrats Kenn.
>> >
>> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci > > > wrote:
>> >
>> > Hi everyone --
>> > It is with great pleasure that I announce that at today's
>> > meeting of the Foundation's Board of Directors, the Board has
>> > appointed Kenneth Knowles as the second chair of the Apache Beam
>> > project.
>> >
>> > Kenn has served on the PMC since its inception, and is very
>> > active and effective in growing the community. His exemplary
>> > posts have been cited in other projects. I'm super happy to have
>> > Kenn accepted the nomination, and I'm confident that he'll serve
>> > with distinction.
>> >
>> > As for myself, I'm not going anywhere. I'm still around and will
>> > be as active as I have recently been. Thrilled to be able to
>> > pass the baton to such a key member of this community and to
>> > have less administrative work to do ;-).
>> >
>> > Please join me in welcoming Kenn to his new role, and I ask that
>> > you support him as much as possible. As always, please let me
>> > know if you have any questions.
>> >
>> > Davor
>> >
>>
>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Andrew Psaltis
Congratulations, Kenn!

On Thu, Sep 20, 2018 at 12:23 AM Alan Myrvold  wrote:

> Congrats, Kenn.
>
> On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels  wrote:
>
>> Congrats!
>>
>> On 19.09.18 22:07, Robin Qiu wrote:
>> > Congratulations, Kenn!
>> >
>> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik > > > wrote:
>> >
>> > Congrats Kenn.
>> >
>> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci > > > wrote:
>> >
>> > Hi everyone --
>> > It is with great pleasure that I announce that at today's
>> > meeting of the Foundation's Board of Directors, the Board has
>> > appointed Kenneth Knowles as the second chair of the Apache Beam
>> > project.
>> >
>> > Kenn has served on the PMC since its inception, and is very
>> > active and effective in growing the community. His exemplary
>> > posts have been cited in other projects. I'm super happy to have
>> > Kenn accepted the nomination, and I'm confident that he'll serve
>> > with distinction.
>> >
>> > As for myself, I'm not going anywhere. I'm still around and will
>> > be as active as I have recently been. Thrilled to be able to
>> > pass the baton to such a key member of this community and to
>> > have less administrative work to do ;-).
>> >
>> > Please join me in welcoming Kenn to his new role, and I ask that
>> > you support him as much as possible. As always, please let me
>> > know if you have any questions.
>> >
>> > Davor
>> >
>>
>


Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Alan Myrvold
Congrats, Kenn.

On Wed, Sep 19, 2018 at 1:08 PM Maximilian Michels  wrote:

> Congrats!
>
> On 19.09.18 22:07, Robin Qiu wrote:
> > Congratulations, Kenn!
> >
> > On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik  > > wrote:
> >
> > Congrats Kenn.
> >
> > On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci  > > wrote:
> >
> > Hi everyone --
> > It is with great pleasure that I announce that at today's
> > meeting of the Foundation's Board of Directors, the Board has
> > appointed Kenneth Knowles as the second chair of the Apache Beam
> > project.
> >
> > Kenn has served on the PMC since its inception, and is very
> > active and effective in growing the community. His exemplary
> > posts have been cited in other projects. I'm super happy to have
> > Kenn accepted the nomination, and I'm confident that he'll serve
> > with distinction.
> >
> > As for myself, I'm not going anywhere. I'm still around and will
> > be as active as I have recently been. Thrilled to be able to
> > pass the baton to such a key member of this community and to
> > have less administrative work to do ;-).
> >
> > Please join me in welcoming Kenn to his new role, and I ask that
> > you support him as much as possible. As always, please let me
> > know if you have any questions.
> >
> > Davor
> >
>


Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Lukasz Cwik
I have to say I'm guilty of not following the merge guidelines, sometimes
doing merges without rebasing/flatten commits.

I find that it is a few extra mins of my time to fix someones PR history if
they have more then one logical commit they want to be separate and it
usually takes days for the PR author to do merging  with the extra burden
as a committer to keep track of another PR and its state (waiting for
clean-up) is taxing. I really liked the idea of the mergebot (even though
it didn't work out in practice) because it could do all the policy work on
my behalf.

Anything that reduces my overhead as a committer is useful as for the 100s
of PRs that I have merged, I've only had to rollback a couple so I'm for
Charle's suggestion which makes the rollback flow slightly more complicated
for a significantly easier PR merge workflow.

On Wed, Sep 19, 2018 at 1:13 PM Charles Chen  wrote:

> What I mean is that if you get the first-parent commit using "git log
> --first-parent", it will incorporate any and all fix up commits so we don't
> need to worry about missing any.
>
> On Wed, Sep 19, 2018, 1:07 PM Maximilian Michels  wrote:
>
>> Generally, +1 for isolated commits which are easy to revert.
>>
>> > I don't think it's actually harder to roll back a set of commits that
>> are merged together.
>> I think Thomas was mainly concerned about "fixup" commits to land in
>> master (as part of a merge). These indeed make reverting commits more
>> difficult because you have to check whether you missed a "fixup".
>>
>> > Ideally every commit should compile and pass tests though, right?
>>
>> That is definitely what we should strive for when doing a merge against
>> master.
>>
>> > Perhaps the bigger issue is that we need better documentation and a
>> playbook on how to do this these common tasks in git.
>>
>> We do actually have basic documentation about this but most people don't
>> read it. For example, the commit message of a Merge commit should be:
>>
>> Merge pull request #: [BEAM-] Issue title
>>
>> But most merge commits don't comply with this rule :) See
>> https://beam.apache.org/contribute/committer-guide/#merging-it
>>
>> On 19.09.18 21:34, Reuven Lax wrote:
>> > Ideally every commit should compile and pass tests though, right?
>> >
>> > On Wed, Sep 19, 2018 at 12:15 PM Ankur Goenka > > > wrote:
>> >
>> > I agree with the cleanliness of the Commit history.
>> > "Fixup!", "Address comments", "Address even more comments" type of
>> > comments does not convey meaningful information and are not very
>> > useful. Its a good idea to squash them.
>> > However, I think its ok to keep separate commits for different
>> > logical pieces of the code which make reviewing and revisiting code
>> > easier.
>> > Example PR: Support X in the pipeline
>> > Commit 1: Restructuring a bunch of code without any logical change.
>> > Commit 2: Changing validation logic for pipeline.
>> > Commit 3: Supporting new field "X" for pipeline.
>> >
>> > On Wed, Sep 19, 2018 at 11:27 AM Charles Chen > > > wrote:
>> >
>> > To be concrete, it is very easy to revert a commit in any case:
>> >
>> >  1. First, use "git log --first-parent" to find the first-parent
>> > commit corresponding to a PR merge (this is a one-to-one
>> > correspondence).
>> >  2. Use "git revert -m 1 " to revert the commit; this
>> > selects the first parent as the base for a merge commit (in
>> > the case where a single commit needs to be reverted, just
>> > use "git revert " without the "-m 1" flag).
>> >
>> > In any case, as a general good engineering practice, I do agree
>> > that it is highly desirable to have small independent PRs
>> > instead of large jumbo PRs whenever possible.
>> >
>> > On Wed, Sep 19, 2018 at 11:20 AM Charles Chen > > > wrote:
>> >
>> > I don't think it's actually harder to roll back a set of
>> > commits that are merged together.  Git has the notion of
>> > first-parent commits (you can see, for example, "git log
>> > --first-parent", which filters out the intermediate
>> > commits).  In this sense, PRs still get merged as one unit
>> > and this is preserved even if intermediate commits are
>> > kept.  Perhaps the bigger issue is that we need better
>> > documentation and a playbook on how to do this these common
>> > tasks in git.
>> >
>> > On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise <
>> t...@apache.org
>> > > wrote:
>> >
>> > Wanted to bring this up as reminder as well as
>> > opportunity to discuss potential changes to our
>> > committer guide. It has been a while since last related
>

Add ruoyun into contributor list?

2018-09-19 Thread ruoyun
Hi, Folks,

 Can some one add me as contributor in the Beam issue tracker? 

 account name in JIRA:  ruoyun

Thanks! 


Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Charles Chen
What I mean is that if you get the first-parent commit using "git log
--first-parent", it will incorporate any and all fix up commits so we don't
need to worry about missing any.

On Wed, Sep 19, 2018, 1:07 PM Maximilian Michels  wrote:

> Generally, +1 for isolated commits which are easy to revert.
>
> > I don't think it's actually harder to roll back a set of commits that
> are merged together.
> I think Thomas was mainly concerned about "fixup" commits to land in
> master (as part of a merge). These indeed make reverting commits more
> difficult because you have to check whether you missed a "fixup".
>
> > Ideally every commit should compile and pass tests though, right?
>
> That is definitely what we should strive for when doing a merge against
> master.
>
> > Perhaps the bigger issue is that we need better documentation and a
> playbook on how to do this these common tasks in git.
>
> We do actually have basic documentation about this but most people don't
> read it. For example, the commit message of a Merge commit should be:
>
> Merge pull request #: [BEAM-] Issue title
>
> But most merge commits don't comply with this rule :) See
> https://beam.apache.org/contribute/committer-guide/#merging-it
>
> On 19.09.18 21:34, Reuven Lax wrote:
> > Ideally every commit should compile and pass tests though, right?
> >
> > On Wed, Sep 19, 2018 at 12:15 PM Ankur Goenka  > > wrote:
> >
> > I agree with the cleanliness of the Commit history.
> > "Fixup!", "Address comments", "Address even more comments" type of
> > comments does not convey meaningful information and are not very
> > useful. Its a good idea to squash them.
> > However, I think its ok to keep separate commits for different
> > logical pieces of the code which make reviewing and revisiting code
> > easier.
> > Example PR: Support X in the pipeline
> > Commit 1: Restructuring a bunch of code without any logical change.
> > Commit 2: Changing validation logic for pipeline.
> > Commit 3: Supporting new field "X" for pipeline.
> >
> > On Wed, Sep 19, 2018 at 11:27 AM Charles Chen  > > wrote:
> >
> > To be concrete, it is very easy to revert a commit in any case:
> >
> >  1. First, use "git log --first-parent" to find the first-parent
> > commit corresponding to a PR merge (this is a one-to-one
> > correspondence).
> >  2. Use "git revert -m 1 " to revert the commit; this
> > selects the first parent as the base for a merge commit (in
> > the case where a single commit needs to be reverted, just
> > use "git revert " without the "-m 1" flag).
> >
> > In any case, as a general good engineering practice, I do agree
> > that it is highly desirable to have small independent PRs
> > instead of large jumbo PRs whenever possible.
> >
> > On Wed, Sep 19, 2018 at 11:20 AM Charles Chen  > > wrote:
> >
> > I don't think it's actually harder to roll back a set of
> > commits that are merged together.  Git has the notion of
> > first-parent commits (you can see, for example, "git log
> > --first-parent", which filters out the intermediate
> > commits).  In this sense, PRs still get merged as one unit
> > and this is preserved even if intermediate commits are
> > kept.  Perhaps the bigger issue is that we need better
> > documentation and a playbook on how to do this these common
> > tasks in git.
> >
> > On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise  > > wrote:
> >
> > Wanted to bring this up as reminder as well as
> > opportunity to discuss potential changes to our
> > committer guide. It has been a while since last related
> > discussion and we welcomed several new committers since
> > then.
> >
> > Finishing up pull requests pre-merge:
> >
> >
> https://beam.apache.org/contribute/committer-guide/#finishing-touches
> >
> > PRs are worked on over time and may accumulate many
> > commits. Sometimes because scope expands, sometimes just
> > to separate independent changes but most of the time the
> > commits are just fixups that are added as review
> progresses.
> > It is important that the latter get squashed prior to PR
> > merge, as otherwise we lost the ability to roll back
> > changes by reverting a single commit and also generally
> > cause a lot of noise in the commit history that does not
> > help other contributors. To be clear, I refer to the
> > "Fixup!", "Address comments", "Address even more
> > 

Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Maximilian Michels

Congrats!

On 19.09.18 22:07, Robin Qiu wrote:

Congratulations, Kenn!

On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik > wrote:


Congrats Kenn.

On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci mailto:da...@apache.org>> wrote:

Hi everyone --
It is with great pleasure that I announce that at today's
meeting of the Foundation's Board of Directors, the Board has
appointed Kenneth Knowles as the second chair of the Apache Beam
project.

Kenn has served on the PMC since its inception, and is very
active and effective in growing the community. His exemplary
posts have been cited in other projects. I'm super happy to have
Kenn accepted the nomination, and I'm confident that he'll serve
with distinction.

As for myself, I'm not going anywhere. I'm still around and will
be as active as I have recently been. Thrilled to be able to
pass the baton to such a key member of this community and to
have less administrative work to do ;-).

Please join me in welcoming Kenn to his new role, and I ask that
you support him as much as possible. As always, please let me
know if you have any questions.

Davor



Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Robin Qiu
Congratulations, Kenn!

On Wed, Sep 19, 2018 at 1:05 PM Lukasz Cwik  wrote:

> Congrats Kenn.
>
> On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci  wrote:
>
>> Hi everyone --
>> It is with great pleasure that I announce that at today's meeting of the
>> Foundation's Board of Directors, the Board has appointed Kenneth Knowles as
>> the second chair of the Apache Beam project.
>>
>> Kenn has served on the PMC since its inception, and is very active and
>> effective in growing the community. His exemplary posts have been cited in
>> other projects. I'm super happy to have Kenn accepted the nomination, and
>> I'm confident that he'll serve with distinction.
>>
>> As for myself, I'm not going anywhere. I'm still around and will be as
>> active as I have recently been. Thrilled to be able to pass the baton to
>> such a key member of this community and to have less administrative work to
>> do ;-).
>>
>> Please join me in welcoming Kenn to his new role, and I ask that you
>> support him as much as possible. As always, please let me know if you have
>> any questions.
>>
>> Davor
>>
>


Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Maximilian Michels

Generally, +1 for isolated commits which are easy to revert.

I don't think it's actually harder to roll back a set of commits that are merged together. 
I think Thomas was mainly concerned about "fixup" commits to land in 
master (as part of a merge). These indeed make reverting commits more 
difficult because you have to check whether you missed a "fixup".



Ideally every commit should compile and pass tests though, right?


That is definitely what we should strive for when doing a merge against 
master.



Perhaps the bigger issue is that we need better documentation and a playbook on 
how to do this these common tasks in git.


We do actually have basic documentation about this but most people don't 
read it. For example, the commit message of a Merge commit should be:


Merge pull request #: [BEAM-] Issue title

But most merge commits don't comply with this rule :) See 
https://beam.apache.org/contribute/committer-guide/#merging-it


On 19.09.18 21:34, Reuven Lax wrote:

Ideally every commit should compile and pass tests though, right?

On Wed, Sep 19, 2018 at 12:15 PM Ankur Goenka > wrote:


I agree with the cleanliness of the Commit history.
"Fixup!", "Address comments", "Address even more comments" type of
comments does not convey meaningful information and are not very
useful. Its a good idea to squash them.
However, I think its ok to keep separate commits for different
logical pieces of the code which make reviewing and revisiting code
easier.
Example PR: Support X in the pipeline
Commit 1: Restructuring a bunch of code without any logical change.
Commit 2: Changing validation logic for pipeline.
Commit 3: Supporting new field "X" for pipeline.

On Wed, Sep 19, 2018 at 11:27 AM Charles Chen mailto:c...@google.com>> wrote:

To be concrete, it is very easy to revert a commit in any case:

 1. First, use "git log --first-parent" to find the first-parent
commit corresponding to a PR merge (this is a one-to-one
correspondence).
 2. Use "git revert -m 1 " to revert the commit; this
selects the first parent as the base for a merge commit (in
the case where a single commit needs to be reverted, just
use "git revert " without the "-m 1" flag).

In any case, as a general good engineering practice, I do agree
that it is highly desirable to have small independent PRs
instead of large jumbo PRs whenever possible.

On Wed, Sep 19, 2018 at 11:20 AM Charles Chen mailto:c...@google.com>> wrote:

I don't think it's actually harder to roll back a set of
commits that are merged together.  Git has the notion of
first-parent commits (you can see, for example, "git log
--first-parent", which filters out the intermediate
commits).  In this sense, PRs still get merged as one unit
and this is preserved even if intermediate commits are
kept.  Perhaps the bigger issue is that we need better
documentation and a playbook on how to do this these common
tasks in git.

On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise mailto:t...@apache.org>> wrote:

Wanted to bring this up as reminder as well as
opportunity to discuss potential changes to our
committer guide. It has been a while since last related
discussion and we welcomed several new committers since
then.

Finishing up pull requests pre-merge:


https://beam.apache.org/contribute/committer-guide/#finishing-touches

PRs are worked on over time and may accumulate many
commits. Sometimes because scope expands, sometimes just
to separate independent changes but most of the time the
commits are just fixups that are added as review progresses.
It is important that the latter get squashed prior to PR
merge, as otherwise we lost the ability to roll back
changes by reverting a single commit and also generally
cause a lot of noise in the commit history that does not
help other contributors. To be clear, I refer to the
"Fixup!", "Address comments", "Address even more
comments" type of entries :)

I would also propose that every commit gets tagged with
a JIRA (except those fixups that will be squashed).
Having the JIRA and possibly other tags makes it easier
for others not involved in the PR to identify changes
after they were merged, for example when looking at the
revision history or annotated source.

As for other scenarios of jumbo PRs with many commits,
there are p

Re: [ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Lukasz Cwik
Congrats Kenn.

On Wed, Sep 19, 2018 at 12:54 PM Davor Bonaci  wrote:

> Hi everyone --
> It is with great pleasure that I announce that at today's meeting of the
> Foundation's Board of Directors, the Board has appointed Kenneth Knowles as
> the second chair of the Apache Beam project.
>
> Kenn has served on the PMC since its inception, and is very active and
> effective in growing the community. His exemplary posts have been cited in
> other projects. I'm super happy to have Kenn accepted the nomination, and
> I'm confident that he'll serve with distinction.
>
> As for myself, I'm not going anywhere. I'm still around and will be as
> active as I have recently been. Thrilled to be able to pass the baton to
> such a key member of this community and to have less administrative work to
> do ;-).
>
> Please join me in welcoming Kenn to his new role, and I ask that you
> support him as much as possible. As always, please let me know if you have
> any questions.
>
> Davor
>


[ANNOUNCEMENT] New Beam chair: Kenneth Knowles

2018-09-19 Thread Davor Bonaci
Hi everyone --
It is with great pleasure that I announce that at today's meeting of the
Foundation's Board of Directors, the Board has appointed Kenneth Knowles as
the second chair of the Apache Beam project.

Kenn has served on the PMC since its inception, and is very active and
effective in growing the community. His exemplary posts have been cited in
other projects. I'm super happy to have Kenn accepted the nomination, and
I'm confident that he'll serve with distinction.

As for myself, I'm not going anywhere. I'm still around and will be as
active as I have recently been. Thrilled to be able to pass the baton to
such a key member of this community and to have less administrative work to
do ;-).

Please join me in welcoming Kenn to his new role, and I ask that you
support him as much as possible. As always, please let me know if you have
any questions.

Davor


Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Reuven Lax
Ideally every commit should compile and pass tests though, right?

On Wed, Sep 19, 2018 at 12:15 PM Ankur Goenka  wrote:

> I agree with the cleanliness of the Commit history.
> "Fixup!", "Address comments", "Address even more comments" type of
> comments does not convey meaningful information and are not very useful.
> Its a good idea to squash them.
> However, I think its ok to keep separate commits for different logical
> pieces of the code which make reviewing and revisiting code easier.
> Example PR: Support X in the pipeline
> Commit 1: Restructuring a bunch of code without any logical change.
> Commit 2: Changing validation logic for pipeline.
> Commit 3: Supporting new field "X" for pipeline.
>
> On Wed, Sep 19, 2018 at 11:27 AM Charles Chen  wrote:
>
>> To be concrete, it is very easy to revert a commit in any case:
>>
>>1. First, use "git log --first-parent" to find the first-parent
>>commit corresponding to a PR merge (this is a one-to-one correspondence).
>>2. Use "git revert -m 1 " to revert the commit; this
>>selects the first parent as the base for a merge commit (in the case where
>>a single commit needs to be reverted, just use "git revert "
>>without the "-m 1" flag).
>>
>> In any case, as a general good engineering practice, I do agree that it
>> is highly desirable to have small independent PRs instead of large jumbo
>> PRs whenever possible.
>>
>> On Wed, Sep 19, 2018 at 11:20 AM Charles Chen  wrote:
>>
>>> I don't think it's actually harder to roll back a set of commits that
>>> are merged together.  Git has the notion of first-parent commits (you can
>>> see, for example, "git log --first-parent", which filters out the
>>> intermediate commits).  In this sense, PRs still get merged as one unit and
>>> this is preserved even if intermediate commits are kept.  Perhaps the
>>> bigger issue is that we need better documentation and a playbook on how to
>>> do this these common tasks in git.
>>>
>>> On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise  wrote:
>>>
 Wanted to bring this up as reminder as well as opportunity to discuss
 potential changes to our committer guide. It has been a while since last
 related discussion and we welcomed several new committers since then.

 Finishing up pull requests pre-merge:

 https://beam.apache.org/contribute/committer-guide/#finishing-touches

 PRs are worked on over time and may accumulate many commits. Sometimes
 because scope expands, sometimes just to separate independent changes but
 most of the time the commits are just fixups that are added as review
 progresses.

 It is important that the latter get squashed prior to PR merge, as
 otherwise we lost the ability to roll back changes by reverting a single
 commit and also generally cause a lot of noise in the commit history that
 does not help other contributors. To be clear, I refer to the "Fixup!",
 "Address comments", "Address even more comments" type of entries :)

 I would also propose that every commit gets tagged with a JIRA (except
 those fixups that will be squashed). Having the JIRA and possibly other
 tags makes it easier for others not involved in the PR to identify changes
 after they were merged, for example when looking at the revision history or
 annotated source.

 As for other scenarios of jumbo PRs with many commits, there are
 probably situations where work needs to be broken down into smaller units,
 making life better for both, contributor and reviewer(s). Ideally, every PR
 would have only one commit, but that may be a bit much to mandate? Is the
 general expectation something we need to document more clearly?

 Thanks,
 Thomas




Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Ankur Goenka
I agree with the cleanliness of the Commit history.
"Fixup!", "Address comments", "Address even more comments" type of comments
does not convey meaningful information and are not very useful. Its a good
idea to squash them.
However, I think its ok to keep separate commits for different logical
pieces of the code which make reviewing and revisiting code easier.
Example PR: Support X in the pipeline
Commit 1: Restructuring a bunch of code without any logical change.
Commit 2: Changing validation logic for pipeline.
Commit 3: Supporting new field "X" for pipeline.

On Wed, Sep 19, 2018 at 11:27 AM Charles Chen  wrote:

> To be concrete, it is very easy to revert a commit in any case:
>
>1. First, use "git log --first-parent" to find the first-parent commit
>corresponding to a PR merge (this is a one-to-one correspondence).
>2. Use "git revert -m 1 " to revert the commit; this selects
>the first parent as the base for a merge commit (in the case where a single
>commit needs to be reverted, just use "git revert " without the
>"-m 1" flag).
>
> In any case, as a general good engineering practice, I do agree that it is
> highly desirable to have small independent PRs instead of large jumbo PRs
> whenever possible.
>
> On Wed, Sep 19, 2018 at 11:20 AM Charles Chen  wrote:
>
>> I don't think it's actually harder to roll back a set of commits that are
>> merged together.  Git has the notion of first-parent commits (you can see,
>> for example, "git log --first-parent", which filters out the intermediate
>> commits).  In this sense, PRs still get merged as one unit and this is
>> preserved even if intermediate commits are kept.  Perhaps the bigger issue
>> is that we need better documentation and a playbook on how to do this these
>> common tasks in git.
>>
>> On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise  wrote:
>>
>>> Wanted to bring this up as reminder as well as opportunity to discuss
>>> potential changes to our committer guide. It has been a while since last
>>> related discussion and we welcomed several new committers since then.
>>>
>>> Finishing up pull requests pre-merge:
>>>
>>> https://beam.apache.org/contribute/committer-guide/#finishing-touches
>>>
>>> PRs are worked on over time and may accumulate many commits. Sometimes
>>> because scope expands, sometimes just to separate independent changes but
>>> most of the time the commits are just fixups that are added as review
>>> progresses.
>>>
>>> It is important that the latter get squashed prior to PR merge, as
>>> otherwise we lost the ability to roll back changes by reverting a single
>>> commit and also generally cause a lot of noise in the commit history that
>>> does not help other contributors. To be clear, I refer to the "Fixup!",
>>> "Address comments", "Address even more comments" type of entries :)
>>>
>>> I would also propose that every commit gets tagged with a JIRA (except
>>> those fixups that will be squashed). Having the JIRA and possibly other
>>> tags makes it easier for others not involved in the PR to identify changes
>>> after they were merged, for example when looking at the revision history or
>>> annotated source.
>>>
>>> As for other scenarios of jumbo PRs with many commits, there are
>>> probably situations where work needs to be broken down into smaller units,
>>> making life better for both, contributor and reviewer(s). Ideally, every PR
>>> would have only one commit, but that may be a bit much to mandate? Is the
>>> general expectation something we need to document more clearly?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>


Re: Migrating Beam SQL to Calcite's code generation

2018-09-19 Thread Andrew Pilloud
To follow up on this, the PR is now in a reviewable state and I've added
more tests for FLOOR and CEIL. Both work with a more extensive set of
arguments after this change. There are now 4 outstanding calcite PRs that
get all the tests passing.

Unfortunately there is no easy way to mix our current implementation and
using Calcite's code generator.

Andrew

On Mon, Sep 17, 2018 at 3:22 PM Mingmin Xu  wrote:

> Awesome work, we should call Calcite operator functions if available.
>
> I haven't get time to read the PR yet, for those impacted would keep
> existing implementation. One example is, I notice FLOOR/CEIL only supports
> months/years recently which is quite a surprise to me.
>
> Mingmin
>
> On Mon, Sep 17, 2018 at 3:03 PM Anton Kedin  wrote:
>
>> This is pretty amazing! Thank you for doing this!
>>
>> Regards,
>> Anton
>>
>> On Mon, Sep 17, 2018 at 2:27 PM Andrew Pilloud 
>> wrote:
>>
>>> I've adapted Calcite's EnumerableCalc code generation to generate the
>>> BeamCalc DoFn. The primary purpose behind this change is so we can take
>>> advantage of Calcite's extensive SQL operator implementation. This deletes
>>> ~11000 lines of code from Beam (with ~350 added), significantly increases
>>> the set of supported SQL operators, and improves performance and
>>> correctness of currently supported operators. Here is my work in progress:
>>> https://github.com/apache/beam/pull/6417
>>>
>>> There are a few bugs in Calcite that this has exposed:
>>>
>>> Fixed in Calcite master:
>>>
>>>- CALCITE-2321 
>>>- The type of a union of CHAR columns of different lengths should be 
>>> VARCHAR
>>>- CALCITE-2447 
>>>- Some POWER, ATAN2 functions fail with NoSuchMethodException
>>>
>>> Pending PRs:
>>>
>>>- CALCITE-2529 
>>>- linq4j should promote integer to floating point when generating 
>>> function
>>>calls
>>>- CALCITE-2530 
>>>- TRIM function does not throw exception when the length of trim 
>>> character
>>>is not 1(one)
>>>
>>> More work:
>>>
>>>- CALCITE-2404 
>>>- Accessing structured-types is not implemented by the runtime
>>>- (none yet) - Support multi character TRIM extension in Calcite
>>>
>>> I would like to push these changes in with these minor regressions. Do
>>> any of these Calcite bugs block this functionality being adding to Beam?
>>>
>>> Andrew
>>>
>>
>
> --
> 
> Mingmin
>


Re: Rethinking Timers as PCollections

2018-09-19 Thread Reuven Lax
I believe that clearTimer has been a feature request before though.

On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik  wrote:

> *How does modelling a timer as a PCollection help the Beam model?*
>
> The largest concern was about how to model timers within Apache Beam that:
> 1) removed the need for the watermark hold that is typically accompanied
> with state/timer implementations
> 2) enabled the ability to set the explicit output time to be independent
> of the firing time for all timer specifications [1]
>
> I felt as though treating timers as a self-loop around the ParDo
> PTransform allowed us to use the natural definition of output watermark =
> min(all input watermarks) as a way to define how timers hold output and
> using windowed values that contained timers as a natural way to represent
> the output time to be independent of the firing time. The purpose of the
> PCollection right now is to store the representation of how timers are
> encoded. I suspect that at some point in time we will have different timer
> encodings.
>
> Having this fit well with how timers are delivered between the SDK and
> Runner was an added bonus. Also, a good portion of the code that I needed
> to fix up was more related to the assumption that there was ever only a
> single input producer to an executable stage and plumbing of timer
> specifications through all the runner library support layers.
>
> --
> *There is no "clear" for timers.*
>
> The current Java API for timers only allows you to set them. Clearing
> timers is not exposed to users and is only used by internal implementations
> to support runners[2] via TimerInternals. Usage of a timer is like so:
>   @TimerId("timer")
>   private final TimerSpec timerSpec =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>   @ProcessElement
>   public void process(
>   ProcessContext context,
>   BoundedWindow window,
>   @TimerId("timer") Timer myTimer) {
>
> myTimer.set(window.maxTimestamp().plus(allowedLateness));
>   }
>
> -
> I'm not a big fan of having timers as a separate field in the elements
> proto. I still think they should be treated as an input/output and we could
> update the representation so that inputs/outputs for PTransforms don't need
> to be "PCollections". I was thinking that our current PCollection
> representation assumes that we'll never want to change it to add extra
> information or do backwards incompatible changes like beam:pcollection:v2.
>
> -
> Other points:
> * side inputs already require a runner to introspect the ParDo payload to
> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
> no different.
> * multimap side input over timers where the key is the key that the timer
> is associated with. iterable side input over timers would allow you to
> iterate over  pairs. This could be useful for skew control in
> sources since they would want to know how far they are ahead vs other
> restrictions.
> * user state as a PCollection can make sense but I can't see how we can
> get past problems when we treat it as an "input" since the input watermark
> would be ignored or infinity?. I do agree that this could open the door to
> sharing "state" such as multi-key transactions but very speculative as you
> say.
>
>
> 1: https://issues.apache.org/jira/browse/BEAM-2535
> 2:
> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>
> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise  wrote:
>
>> Robert,
>>
>> Thanks for presenting these thoughts. Your attempt to implement the timer
>> support in the Python runner is the first strong signal we have and it is
>> the right time to make changes - AFAIK no other runner work has been done.
>>
>> I'm also a bit concerned about the acrobatics required in the PR to make
>> this work. Luke will be in the best position to comment, but as I recall we
>> considered modeling timers as special PCollections a simplification for SDK
>> <> Runner interaction and overall implementation. The special treatment
>> (and slight confusion) at the graph level perhaps was an early warning
>> sign, discovering the extra complexity wiring this in a runner should be a
>> reason to revisit.
>>
>> Conceptually timers are special state, they are certainly more state than
>> stream :) Regardless how they are passed to the harness, the runner will
>> need to treat them similar to side inputs and user state.
>>
>> Thanks,
>> Thomas
>>
>>
>>
>>
>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw 
>> wrote:
>>
>>> TLDR Perhaps we should revisit
>>> https://s.apache.org/beam-portability-timers in light of the fact that
>>> Timers are more like State than PCollections.
>>>
>>> --
>>>
>>> While looking at implementing State and Timers in the Python SDK, I've
>>> been revisiting the ideas presented at
>>> https://s.apache.org/beam-portability-timers , and am now starting to
>>> wonder if this is actually the best way to model

Re: Rethinking Timers as PCollections

2018-09-19 Thread Lukasz Cwik
Short train of thoughts continues.

I do agree that something could change, I just don't believe that we are at
something better yet with what has been proposed so far.

On Wed, Sep 19, 2018 at 11:43 AM Lukasz Cwik  wrote:

> Clear can be modeled by a boolean state cell of process/ignore next timer
> firing. Not as good for watermark advancement though if you can eagerly
> clear something.
>
> Longer term you could retract a "timer" from a PCollection once
> retractions are supported.
>
> On Wed, Sep 19, 2018 at 11:40 AM Reuven Lax  wrote:
>
>> I believe that clearTimer has been a feature request before though.
>>
>> On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik  wrote:
>>
>>> *How does modelling a timer as a PCollection help the Beam model?*
>>>
>>> The largest concern was about how to model timers within Apache Beam
>>> that:
>>> 1) removed the need for the watermark hold that is typically accompanied
>>> with state/timer implementations
>>> 2) enabled the ability to set the explicit output time to be independent
>>> of the firing time for all timer specifications [1]
>>>
>>> I felt as though treating timers as a self-loop around the ParDo
>>> PTransform allowed us to use the natural definition of output watermark =
>>> min(all input watermarks) as a way to define how timers hold output and
>>> using windowed values that contained timers as a natural way to represent
>>> the output time to be independent of the firing time. The purpose of the
>>> PCollection right now is to store the representation of how timers are
>>> encoded. I suspect that at some point in time we will have different timer
>>> encodings.
>>>
>>> Having this fit well with how timers are delivered between the SDK and
>>> Runner was an added bonus. Also, a good portion of the code that I needed
>>> to fix up was more related to the assumption that there was ever only a
>>> single input producer to an executable stage and plumbing of timer
>>> specifications through all the runner library support layers.
>>>
>>> --
>>> *There is no "clear" for timers.*
>>>
>>> The current Java API for timers only allows you to set them. Clearing
>>> timers is not exposed to users and is only used by internal implementations
>>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>>   @TimerId("timer")
>>>   private final TimerSpec timerSpec =
>>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>>
>>>   @ProcessElement
>>>   public void process(
>>>   ProcessContext context,
>>>   BoundedWindow window,
>>>   @TimerId("timer") Timer myTimer) {
>>>
>>> myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>>   }
>>>
>>> -
>>> I'm not a big fan of having timers as a separate field in the elements
>>> proto. I still think they should be treated as an input/output and we could
>>> update the representation so that inputs/outputs for PTransforms don't need
>>> to be "PCollections". I was thinking that our current PCollection
>>> representation assumes that we'll never want to change it to add extra
>>> information or do backwards incompatible changes like beam:pcollection:v2.
>>>
>>> -
>>> Other points:
>>> * side inputs already require a runner to introspect the ParDo payload
>>> to get the SideInputSpec, requiring it to have knowledge of the TimerSpec
>>> is no different.
>>> * multimap side input over timers where the key is the key that the
>>> timer is associated with. iterable side input over timers would allow you
>>> to iterate over  pairs. This could be useful for skew control
>>> in sources since they would want to know how far they are ahead vs other
>>> restrictions.
>>> * user state as a PCollection can make sense but I can't see how we can
>>> get past problems when we treat it as an "input" since the input watermark
>>> would be ignored or infinity?. I do agree that this could open the door to
>>> sharing "state" such as multi-key transactions but very speculative as you
>>> say.
>>>
>>>
>>> 1: https://issues.apache.org/jira/browse/BEAM-2535
>>> 2:
>>> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>>>
>>> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise  wrote:
>>>
 Robert,

 Thanks for presenting these thoughts. Your attempt to implement the
 timer support in the Python runner is the first strong signal we have and
 it is the right time to make changes - AFAIK no other runner work has been
 done.

 I'm also a bit concerned about the acrobatics required in the PR to
 make this work. Luke will be in the best position to comment, but as I
 recall we considered modeling timers as special PCollections a
 simplification for SDK <> Runner interaction and overall implementation.
 The special treatment (and slight confusion) at the graph level perhaps was
 an early warning sign, discovering the extra complexity wiring this in a
 runner should be a reason to revisit

Re: Rethinking Timers as PCollections

2018-09-19 Thread Lukasz Cwik
Clear can be modeled by a boolean state cell of process/ignore next timer
firing. Not as good for watermark advancement though if you can eagerly
clear something.

Longer term you could retract a "timer" from a PCollection once retractions
are supported.

On Wed, Sep 19, 2018 at 11:40 AM Reuven Lax  wrote:

> I believe that clearTimer has been a feature request before though.
>
> On Wed, Sep 19, 2018 at 11:31 AM Lukasz Cwik  wrote:
>
>> *How does modelling a timer as a PCollection help the Beam model?*
>>
>> The largest concern was about how to model timers within Apache Beam that:
>> 1) removed the need for the watermark hold that is typically accompanied
>> with state/timer implementations
>> 2) enabled the ability to set the explicit output time to be independent
>> of the firing time for all timer specifications [1]
>>
>> I felt as though treating timers as a self-loop around the ParDo
>> PTransform allowed us to use the natural definition of output watermark =
>> min(all input watermarks) as a way to define how timers hold output and
>> using windowed values that contained timers as a natural way to represent
>> the output time to be independent of the firing time. The purpose of the
>> PCollection right now is to store the representation of how timers are
>> encoded. I suspect that at some point in time we will have different timer
>> encodings.
>>
>> Having this fit well with how timers are delivered between the SDK and
>> Runner was an added bonus. Also, a good portion of the code that I needed
>> to fix up was more related to the assumption that there was ever only a
>> single input producer to an executable stage and plumbing of timer
>> specifications through all the runner library support layers.
>>
>> --
>> *There is no "clear" for timers.*
>>
>> The current Java API for timers only allows you to set them. Clearing
>> timers is not exposed to users and is only used by internal implementations
>> to support runners[2] via TimerInternals. Usage of a timer is like so:
>>   @TimerId("timer")
>>   private final TimerSpec timerSpec =
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>
>>   @ProcessElement
>>   public void process(
>>   ProcessContext context,
>>   BoundedWindow window,
>>   @TimerId("timer") Timer myTimer) {
>>
>> myTimer.set(window.maxTimestamp().plus(allowedLateness));
>>   }
>>
>> -
>> I'm not a big fan of having timers as a separate field in the elements
>> proto. I still think they should be treated as an input/output and we could
>> update the representation so that inputs/outputs for PTransforms don't need
>> to be "PCollections". I was thinking that our current PCollection
>> representation assumes that we'll never want to change it to add extra
>> information or do backwards incompatible changes like beam:pcollection:v2.
>>
>> -
>> Other points:
>> * side inputs already require a runner to introspect the ParDo payload to
>> get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
>> no different.
>> * multimap side input over timers where the key is the key that the timer
>> is associated with. iterable side input over timers would allow you to
>> iterate over  pairs. This could be useful for skew control in
>> sources since they would want to know how far they are ahead vs other
>> restrictions.
>> * user state as a PCollection can make sense but I can't see how we can
>> get past problems when we treat it as an "input" since the input watermark
>> would be ignored or infinity?. I do agree that this could open the door to
>> sharing "state" such as multi-key transactions but very speculative as you
>> say.
>>
>>
>> 1: https://issues.apache.org/jira/browse/BEAM-2535
>> 2:
>> https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22
>>
>> On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise  wrote:
>>
>>> Robert,
>>>
>>> Thanks for presenting these thoughts. Your attempt to implement the
>>> timer support in the Python runner is the first strong signal we have and
>>> it is the right time to make changes - AFAIK no other runner work has been
>>> done.
>>>
>>> I'm also a bit concerned about the acrobatics required in the PR to make
>>> this work. Luke will be in the best position to comment, but as I recall we
>>> considered modeling timers as special PCollections a simplification for SDK
>>> <> Runner interaction and overall implementation. The special treatment
>>> (and slight confusion) at the graph level perhaps was an early warning
>>> sign, discovering the extra complexity wiring this in a runner should be a
>>> reason to revisit.
>>>
>>> Conceptually timers are special state, they are certainly more state
>>> than stream :) Regardless how they are passed to the harness, the runner
>>> will need to treat them similar to side inputs and user state.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>>
>>>
>>> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw 
>>> wrote:
>>>

Re: Rethinking Timers as PCollections

2018-09-19 Thread Lukasz Cwik
*How does modelling a timer as a PCollection help the Beam model?*

The largest concern was about how to model timers within Apache Beam that:
1) removed the need for the watermark hold that is typically accompanied
with state/timer implementations
2) enabled the ability to set the explicit output time to be independent of
the firing time for all timer specifications [1]

I felt as though treating timers as a self-loop around the ParDo PTransform
allowed us to use the natural definition of output watermark = min(all
input watermarks) as a way to define how timers hold output and using
windowed values that contained timers as a natural way to represent the
output time to be independent of the firing time. The purpose of the
PCollection right now is to store the representation of how timers are
encoded. I suspect that at some point in time we will have different timer
encodings.

Having this fit well with how timers are delivered between the SDK and
Runner was an added bonus. Also, a good portion of the code that I needed
to fix up was more related to the assumption that there was ever only a
single input producer to an executable stage and plumbing of timer
specifications through all the runner library support layers.

--
*There is no "clear" for timers.*

The current Java API for timers only allows you to set them. Clearing
timers is not exposed to users and is only used by internal implementations
to support runners[2] via TimerInternals. Usage of a timer is like so:
  @TimerId("timer")
  private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
  ProcessContext context,
  BoundedWindow window,
  @TimerId("timer") Timer myTimer) {

myTimer.set(window.maxTimestamp().plus(allowedLateness));
  }

-
I'm not a big fan of having timers as a separate field in the elements
proto. I still think they should be treated as an input/output and we could
update the representation so that inputs/outputs for PTransforms don't need
to be "PCollections". I was thinking that our current PCollection
representation assumes that we'll never want to change it to add extra
information or do backwards incompatible changes like beam:pcollection:v2.

-
Other points:
* side inputs already require a runner to introspect the ParDo payload to
get the SideInputSpec, requiring it to have knowledge of the TimerSpec is
no different.
* multimap side input over timers where the key is the key that the timer
is associated with. iterable side input over timers would allow you to
iterate over  pairs. This could be useful for skew control in
sources since they would want to know how far they are ahead vs other
restrictions.
* user state as a PCollection can make sense but I can't see how we can get
past problems when we treat it as an "input" since the input watermark
would be ignored or infinity?. I do agree that this could open the door to
sharing "state" such as multi-key transactions but very speculative as you
say.


1: https://issues.apache.org/jira/browse/BEAM-2535
2:
https://github.com/apache/beam/search?q=%22org.apache.beam.sdk.state.Timers%22&unscoped_q=%22org.apache.beam.sdk.state.Timers%22

On Wed, Sep 19, 2018 at 6:28 AM Thomas Weise  wrote:

> Robert,
>
> Thanks for presenting these thoughts. Your attempt to implement the timer
> support in the Python runner is the first strong signal we have and it is
> the right time to make changes - AFAIK no other runner work has been done.
>
> I'm also a bit concerned about the acrobatics required in the PR to make
> this work. Luke will be in the best position to comment, but as I recall we
> considered modeling timers as special PCollections a simplification for SDK
> <> Runner interaction and overall implementation. The special treatment
> (and slight confusion) at the graph level perhaps was an early warning
> sign, discovering the extra complexity wiring this in a runner should be a
> reason to revisit.
>
> Conceptually timers are special state, they are certainly more state than
> stream :) Regardless how they are passed to the harness, the runner will
> need to treat them similar to side inputs and user state.
>
> Thanks,
> Thomas
>
>
>
>
> On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw 
> wrote:
>
>> TLDR Perhaps we should revisit
>> https://s.apache.org/beam-portability-timers in light of the fact that
>> Timers are more like State than PCollections.
>>
>> --
>>
>> While looking at implementing State and Timers in the Python SDK, I've
>> been revisiting the ideas presented at
>> https://s.apache.org/beam-portability-timers , and am now starting to
>> wonder if this is actually the best way to model things (at least at the
>> Runner level). Instead it seems Timers are more resemble, and are tightly
>> bound to, State than PCollections.
>>
>> This is especially clear when writing timers. These timers are not a bag
>> of emitted elements, rather one sets (and clears) timers and the set of

Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Charles Chen
To be concrete, it is very easy to revert a commit in any case:

   1. First, use "git log --first-parent" to find the first-parent commit
   corresponding to a PR merge (this is a one-to-one correspondence).
   2. Use "git revert -m 1 " to revert the commit; this selects
   the first parent as the base for a merge commit (in the case where a single
   commit needs to be reverted, just use "git revert " without the
   "-m 1" flag).

In any case, as a general good engineering practice, I do agree that it is
highly desirable to have small independent PRs instead of large jumbo PRs
whenever possible.

On Wed, Sep 19, 2018 at 11:20 AM Charles Chen  wrote:

> I don't think it's actually harder to roll back a set of commits that are
> merged together.  Git has the notion of first-parent commits (you can see,
> for example, "git log --first-parent", which filters out the intermediate
> commits).  In this sense, PRs still get merged as one unit and this is
> preserved even if intermediate commits are kept.  Perhaps the bigger issue
> is that we need better documentation and a playbook on how to do this these
> common tasks in git.
>
> On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise  wrote:
>
>> Wanted to bring this up as reminder as well as opportunity to discuss
>> potential changes to our committer guide. It has been a while since last
>> related discussion and we welcomed several new committers since then.
>>
>> Finishing up pull requests pre-merge:
>>
>> https://beam.apache.org/contribute/committer-guide/#finishing-touches
>>
>> PRs are worked on over time and may accumulate many commits. Sometimes
>> because scope expands, sometimes just to separate independent changes but
>> most of the time the commits are just fixups that are added as review
>> progresses.
>>
>> It is important that the latter get squashed prior to PR merge, as
>> otherwise we lost the ability to roll back changes by reverting a single
>> commit and also generally cause a lot of noise in the commit history that
>> does not help other contributors. To be clear, I refer to the "Fixup!",
>> "Address comments", "Address even more comments" type of entries :)
>>
>> I would also propose that every commit gets tagged with a JIRA (except
>> those fixups that will be squashed). Having the JIRA and possibly other
>> tags makes it easier for others not involved in the PR to identify changes
>> after they were merged, for example when looking at the revision history or
>> annotated source.
>>
>> As for other scenarios of jumbo PRs with many commits, there are probably
>> situations where work needs to be broken down into smaller units, making
>> life better for both, contributor and reviewer(s). Ideally, every PR would
>> have only one commit, but that may be a bit much to mandate? Is the general
>> expectation something we need to document more clearly?
>>
>> Thanks,
>> Thomas
>>
>>


Re: [DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Charles Chen
I don't think it's actually harder to roll back a set of commits that are
merged together.  Git has the notion of first-parent commits (you can see,
for example, "git log --first-parent", which filters out the intermediate
commits).  In this sense, PRs still get merged as one unit and this is
preserved even if intermediate commits are kept.  Perhaps the bigger issue
is that we need better documentation and a playbook on how to do this these
common tasks in git.

On Wed, Sep 19, 2018 at 9:27 AM Thomas Weise  wrote:

> Wanted to bring this up as reminder as well as opportunity to discuss
> potential changes to our committer guide. It has been a while since last
> related discussion and we welcomed several new committers since then.
>
> Finishing up pull requests pre-merge:
>
> https://beam.apache.org/contribute/committer-guide/#finishing-touches
>
> PRs are worked on over time and may accumulate many commits. Sometimes
> because scope expands, sometimes just to separate independent changes but
> most of the time the commits are just fixups that are added as review
> progresses.
>
> It is important that the latter get squashed prior to PR merge, as
> otherwise we lost the ability to roll back changes by reverting a single
> commit and also generally cause a lot of noise in the commit history that
> does not help other contributors. To be clear, I refer to the "Fixup!",
> "Address comments", "Address even more comments" type of entries :)
>
> I would also propose that every commit gets tagged with a JIRA (except
> those fixups that will be squashed). Having the JIRA and possibly other
> tags makes it easier for others not involved in the PR to identify changes
> after they were merged, for example when looking at the revision history or
> annotated source.
>
> As for other scenarios of jumbo PRs with many commits, there are probably
> situations where work needs to be broken down into smaller units, making
> life better for both, contributor and reviewer(s). Ideally, every PR would
> have only one commit, but that may be a bit much to mandate? Is the general
> expectation something we need to document more clearly?
>
> Thanks,
> Thomas
>
>


[DISCUSS] Committer Guidelines / Hygene before merging PRs

2018-09-19 Thread Thomas Weise
Wanted to bring this up as reminder as well as opportunity to discuss
potential changes to our committer guide. It has been a while since last
related discussion and we welcomed several new committers since then.

Finishing up pull requests pre-merge:

https://beam.apache.org/contribute/committer-guide/#finishing-touches

PRs are worked on over time and may accumulate many commits. Sometimes
because scope expands, sometimes just to separate independent changes but
most of the time the commits are just fixups that are added as review
progresses.

It is important that the latter get squashed prior to PR merge, as
otherwise we lost the ability to roll back changes by reverting a single
commit and also generally cause a lot of noise in the commit history that
does not help other contributors. To be clear, I refer to the "Fixup!",
"Address comments", "Address even more comments" type of entries :)

I would also propose that every commit gets tagged with a JIRA (except
those fixups that will be squashed). Having the JIRA and possibly other
tags makes it easier for others not involved in the PR to identify changes
after they were merged, for example when looking at the revision history or
annotated source.

As for other scenarios of jumbo PRs with many commits, there are probably
situations where work needs to be broken down into smaller units, making
life better for both, contributor and reviewer(s). Ideally, every PR would
have only one commit, but that may be a bit much to mandate? Is the general
expectation something we need to document more clearly?

Thanks,
Thomas


Re: Proposal for Beam Python User State and Timer APIs

2018-09-19 Thread Pablo Estrada
Very cool! Thanks Charles!

On Tue, Sep 18, 2018, 9:56 AM Charles Chen  wrote:

> An update: the reference DirectRunner implementation of (and common
> execution code for) the Python user state and timers API has been merged:
> https://github.com/apache/beam/pull/6304
>
> On Thu, Aug 30, 2018 at 1:48 AM Charles Chen  wrote:
>
>> Another update: the reference DirectRunner implementation of the Python
>> user state and timers API is out for review:
>> https://github.com/apache/beam/pull/6304
>>
>> On Mon, Jul 9, 2018 at 2:18 PM Charles Chen  wrote:
>>
>>> An update: https://github.com/apache/beam/pull/5691 has been merged.  I
>>> hope to send out a reference implementation in the DirectRunner soon.  On
>>> the roadmap after that is work on the relevant portability interfaces here
>>> so we can get this working on runners like Beam Python on Flink.
>>>
>>> On Wed, Jun 20, 2018 at 10:00 AM Charles Chen  wrote:
>>>
 An update on the implementation: I recently sent out the user-facing
 pipeline construction part of the API implementation out for review:
 https://github.com/apache/beam/pull/5691.

 On Tue, Jun 5, 2018 at 5:26 PM Charles Chen  wrote:

> Thanks everyone for contributing here.  We've reached rough consensus
> on the approach we should take with this API, and I've summarized this in
> the new "Community consensus" sections I added to the doc (
> https://s.apache.org/beam-python-user-state-and-timers).  I will
> begin initial implementation of this API soon.
>
> On Wed, May 23, 2018 at 8:08 PM Thomas Weise  wrote:
>
>> Nice proposal; it's exciting to see this about to be added to the SDK
>> as it enables a set of more complex use cases.
>>
>> I also think that some of the content can later be repurposed as user
>> documentation.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, May 23, 2018 at 11:49 AM, Charles Chen 
>> wrote:
>>
>>> Thanks everyone for the detailed comments and discussions.  It looks
>>> like by now, we mostly agree with the requirements and overall direction
>>> needed for the API, though there is continuing discussion on specific
>>> details.  I want to highlight two new sections of the doc, which address
>>> some discussions that have come up:
>>>
>>>- *Existing state and transactionality*: this section addresses
>>>how we will address an existing transactionality inconsistency in the
>>>existing Java API.  (
>>>
>>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ofyl9jspiz3b
>>>)
>>>- *State for merging windows*: this section addresses how we
>>>will deal with non-combinable state in conjunction with merging 
>>> windows.  (
>>>
>>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ctxkcgabtzpy
>>>)
>>>
>>> Let me know any further comments and suggestions.
>>>
>>> On Tue, May 22, 2018 at 9:29 AM Kenneth Knowles 
>>> wrote:
>>>
 Nice. I know that Java users have found it helpful to have this
 lower-level way of writing pipelines when the high-level primitives 
 don't
 quite have the tight control they are looking for. I hope it will be a 
 big
 draw for Python, too.

 (commenting on the doc)

 Kenn

 On Mon, May 21, 2018 at 5:15 PM Charles Chen 
 wrote:

> I want to share a proposal for adding user state and timer support
> to the Beam Python SDK and get the community's thoughts on how such 
> an API
> should look:
> https://s.apache.org/beam-python-user-state-and-timers
>
> Let me know what you think and please add any comments and
> suggestions you may have.
>
> Best,
> Charles
>

>>


Re: [VOTE] Release 2.7.0, release candidate #1

2018-09-19 Thread Romain Manni-Bucau
Quick update on the spark issue: I didn't get enough time to identify it
clearly but managed to have a passing run of my test changing a bunch of
versions.
I suspect my code triggers some class conflicting between spark and my
shade leading to a serialization issue. I didn't test userClassPathFirst
option of spark but it can be an interesting thing to enable in beam runner.
However it is still very confusing to have it not running just upgrading
beam version and the spark error is very hard to understand.

Romain Manni-Bucau
@rmannibucau  |  Blog
 | Old Blog
 | Github  |
LinkedIn  | Book



Le mar. 18 sept. 2018 à 20:17, Lukasz Cwik  a écrit :

> Romain hinted that this was a dependency issue but when comparing the two
> dependency trees I don't get much of a difference:
>
> lcwik@lcwik0: ~$ diff /tmp/260 /tmp/270
> < [INFO] +- org.apache.beam:beam-runners-spark:jar:2.6.0:compile
> < [INFO] |  +- org.apache.beam:beam-model-pipeline:jar:2.6.0:compile
> ---
> > [INFO] +- org.apache.beam:beam-runners-spark:jar:2.7.0:compile
> > [INFO] |  +- org.apache.beam:beam-model-pipeline:jar:2.7.0:compile
> 5c6
> < [INFO] |  +- org.apache.beam:beam-sdks-java-core:jar:2.6.0:compile
> ---
> > [INFO] |  +- org.apache.beam:beam-sdks-java-core:jar:2.7.0:compile
> 14,18c15,19
> < [INFO] |  |  \- org.tukaani:xz:jar:1.5:compile
> < [INFO] |  +-
> org.apache.beam:beam-runners-core-construction-java:jar:2.6.0:compile
> < [INFO] |  |  \-
> org.apache.beam:beam-model-job-management:jar:2.6.0:compile
> < [INFO] |  +- org.apache.beam:beam-runners-core-java:jar:2.6.0:compile
> < [INFO] |  |  \- org.apache.beam:beam-model-fn-execution:jar:2.6.0:compile
> ---
> > [INFO] |  |  \- org.tukaani:xz:jar:1.8:compile
> > [INFO] |  +-
> org.apache.beam:beam-runners-core-construction-java:jar:2.7.0:compile
> > [INFO] |  |  \-
> org.apache.beam:beam-model-job-management:jar:2.7.0:compile
> > [INFO] |  +- org.apache.beam:beam-runners-core-java:jar:2.7.0:compile
> > [INFO] |  |  \- org.apache.beam:beam-model-fn-execution:jar:2.7.0:compile
>
> Other then Beam package changes, the only other change is xz which I don't
> believe could be causing the issue.
>
> On Tue, Sep 18, 2018 at 8:38 AM Jean-Baptiste Onofré 
> wrote:
>
>> Thanks, let me take a look.
>>
>> Regards
>> JB
>>
>> On 18/09/2018 17:36, Romain Manni-Bucau wrote:
>> >
>> >
>> >
>> > Le mar. 18 sept. 2018 à 16:44, Jean-Baptiste Onofré > > > a écrit :
>> >
>> > Hi,
>> >
>> > I don't have the issue ;)
>> >
>> > As said in my vote, I tested 2.7.0 RC1 on beam-samples with Spark
>> > without problem.
>> >
>> > I don't reproduce Romain issue as well.
>> >
>> > @Romain can you provide some details to reproduce the issue ?
>> >
>> >
>> > Sure, you can use this
>> > reproducer: https://github.com/rmannibucau/beam-2.7.0-fails
>> > It shows that it suceeds on 2.6 and fails on 2.7.
>> >
>> >
>> >
>> > Regards
>> > JB
>> >
>> > On 17/09/2018 19:17, Charles Chen wrote:
>> > > Luke, Maximillian, Raghu, can you please propose cherry-pick PRs
>> > to the
>> > > release-2.7.0 for your issues and add me as a reviewer
>> > (@charlesccychen)?
>> > >
>> > > Romain, JB: is there any way I can help with debugging the issue
>> > you're
>> > > facing so we can unblock the release?
>> > >
>> > > On Fri, Sep 14, 2018 at 1:49 PM Raghu Angadi > > 
>> > > >> wrote:
>> > >
>> > > I would like propose one more cherrypick for RC2
>> > > : https://github.com/apache/beam/pull/6391
>> > > This is a KafkaIO bug fix. Once a user hits this bug, there
>> is no
>> > > easy work around for them, especially on Dataflow. Only work
>> > around
>> > > in Dataflow is to restart or reload the job.
>> > >
>> > > The fix itself fairly safe and is tested.
>> > > Raghu.
>> > >
>> > > On Fri, Sep 14, 2018 at 12:52 AM Alexey Romanenko
>> > > mailto:aromanenko@gmail.com>
>> > > >>>
>> > wrote:
>> > >
>> > > Perhaps it could help, but I run simple WordCount (built
>> with
>> > > Beam 2.7) on YARN/Spark (HDP Sandbox) cluster and it
>> > worked fine
>> > > for me.
>> > >
>> > >> On 14 Sep 2018, at 06:56, Romain Manni-Bucau
>> > >> mailto:rmannibu...@gmail.com>
>> > >>
>> wrote:
>> > >>
>> > >> Hi Charles,
>> > >>
>> > >> I didn't get enough time to check deeply but it is
>> clearly a
>>

Re: Rethinking Timers as PCollections

2018-09-19 Thread Thomas Weise
Robert,

Thanks for presenting these thoughts. Your attempt to implement the timer
support in the Python runner is the first strong signal we have and it is
the right time to make changes - AFAIK no other runner work has been done.

I'm also a bit concerned about the acrobatics required in the PR to make
this work. Luke will be in the best position to comment, but as I recall we
considered modeling timers as special PCollections a simplification for SDK
<> Runner interaction and overall implementation. The special treatment
(and slight confusion) at the graph level perhaps was an early warning
sign, discovering the extra complexity wiring this in a runner should be a
reason to revisit.

Conceptually timers are special state, they are certainly more state than
stream :) Regardless how they are passed to the harness, the runner will
need to treat them similar to side inputs and user state.

Thanks,
Thomas




On Wed, Sep 19, 2018 at 3:33 AM Robert Bradshaw  wrote:

> TLDR Perhaps we should revisit
> https://s.apache.org/beam-portability-timers in light of the fact that
> Timers are more like State than PCollections.
>
> --
>
> While looking at implementing State and Timers in the Python SDK, I've
> been revisiting the ideas presented at
> https://s.apache.org/beam-portability-timers , and am now starting to
> wonder if this is actually the best way to model things (at least at the
> Runner level). Instead it seems Timers are more resemble, and are tightly
> bound to, State than PCollections.
>
> This is especially clear when writing timers. These timers are not a bag
> of emitted elements, rather one sets (and clears) timers and the set of
> timers that end up firing are a result of this *ordered* sequence of
> operations. It is also often important that the setting of timers be
> ordered with respect to the setting and clearing of state itself (and is
> more often than not collocated with such requests).
>
> In addition, these self-loops add complexity to the graph but provide no
> additional information--they are entirely redundant with the timerspecs
> already present on DoFns. Generally I prefer less redundancy in the spec,
> rather than have it be over-constrained. It's unclear what a runner that
> didn't introspect the DoFn's TimerSpecs would do with this these special
> edges, and also unclear how they would differ from possible self-loops due
> to more traditional iteration.
>
> The primary motivation to express timers in this way seems to be the
> desire to push them to workers using the data plan, rather than inventing
> another mechanism or making them pull-based like with state. I think this
> could be done by simply adding a Timer field to the Elements (or Data)
> proto. (Note that this is not the same as having an hacky ElementOrTimer
> elements flow through the graph.) Writes would be state requests, and
> perhaps it would even make sense to "read" the current value of an unfired
> timer over the state API, to be able to set things like
> {min,max}(new_timestamp,old_timestamp}.
>
> (We could alternatively attempt to model State(s) as a PCollection(s), but
> this is more speculative and would likely exacerbate some of the issues
> above (though it could open the door for DoFns that somehow *share* state).
> They seem like different objects though, one is a mutable store, the other
> an immutable stream.)
>
> I realize this is a big shift, but we could probably adapt the existing
> Python/Java implementations fairly easily (and it would probably simplify
> them). And it's easier to do simplifications like this sooner rather than
> later.
>
> What do people think about this? Any obvious (or not-so-obvious) downsides
> that I'm missing?
>
> - Robert
>
>


***UNCHECKED*** Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-09-19 Thread Robert Bradshaw
On Fri, Sep 14, 2018 at 3:01 PM Thomas Weise  wrote:

> That's actually how the Flink runner already works - bundle processing
> starts when elements are available (see FlinkExecutableStageFunction for
> batch mode).
>
> But we still have the possibility of the SDK getting concurrent requests
> due to parallelism (and pipelined execution).
>

Concurrent requests should be fine, but are there cases (in batch) where a
bundle that was started cannot finish without other bundles finishing
first? What pipelining would we see if the graph is of the form
ExecutableStage - GBK - ExecutableStage - GBK - ... (or is it not always a
digraph of this form, possibly with branching)?


>
> Thanks,
> Thomas
>
>
> On Fri, Sep 14, 2018 at 2:56 AM Robert Bradshaw 
> wrote:
>
>> Currently the best solution we've come up with is that we must process an
>> unbounded number of bundles concurrently to avoid deadlock. Especially in
>> the batch case, this may be wasteful as we bring up workers for many stages
>> that are not actually executable until upstream stages finish. Since it may
>> be invasive to require runners to only schedule stages that can be actively
>> worked on, I've been thinking about what we could do in the common runner
>> libraries themselves. One idea is to postpone the actual sending of a
>> process bundle request until there is data on the channel to consume. With
>> Reads as Impulses, and triggers as data, all bundles are driven by some
>> input.
>>
>> This would mean we never ask the SDK to process bundles it cannot
>> immediately start working on. There is still the open question of whether
>> being able to *start* a bundle implies that one is able to *finish* a
>> bundle (i.e. do any runners start bundles and then block, pending other
>> bundle completion, before closing the data channel (though clearly a runner
>> can chop a bundle off at any point if it wants)).
>>
>> Does this approach sound feasible?
>>
>>
>> On Thu, Aug 30, 2018 at 2:54 AM Ankur Goenka  wrote:
>>
>>> I managed to write a small document based on the discussion.
>>> Please take a look at
>>> https://docs.google.com/document/d/1oAXVPbJ0dzj2_8LXEWFAgqCP5Tpld3q5B3QU254PQ6A/edit?usp=sharing
>>>
>>>
>>> On Tue, Aug 21, 2018 at 11:01 PM Henning Rohde 
>>> wrote:
>>>
 Sending bundles that cannot be executed, i.e., the situation described
 to cause deadlock in Flink in the beginning of the thread with mapB. The
 discussion of exposing (or assuming an infinitely large) concurrency level
 -- while a useful concept in its own right -- came around as a way to
 unblock mapB.

 On Tue, Aug 21, 2018 at 2:16 PM Lukasz Cwik  wrote:

> Henning, can you clarify by what you mean with send non-executable
> bundles to the SDK harness and how it is useful for Flink?
>
> On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde 
> wrote:
>
>> I think it will be useful to the runner to know upfront what the
>> fundamental threading capabilities are for the SDK harness (say, "fixed",
>> "linear", "dynamic", ..) so that the runner can upfront make a good 
>> static
>> decision on #harnesses and how many resources they should each have. It's
>> wasteful to give the Foo SDK a whole many-core machine with TBs of 
>> memory,
>> if it can only support a single bundle at a time. I think this is also in
>> line with what Thomas and Luke are suggesting.
>>
>> However, it still seems to me to be a semantically problematic idea
>> to send non-executable bundles to the SDK harness. I understand it's 
>> useful
>> for Flink, but is that really the best path forward?
>>
>>
>>
>> On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka 
>> wrote:
>>
>>> That's right.
>>> To add to it. We added multi threading to python streaming as a
>>> single thread is sub optimal for streaming use case.
>>> Shall we move towards a conclusion on the SDK bundle processing
>>> upper bound?
>>>
>>> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik 
>>> wrote:
>>>
 Ankur, I can see where you are going with your argument. I believe
 there is certain information which is static and won't change at 
 pipeline
 creation time (such as Python SDK is most efficient doing one bundle 
 at a
 time) and some stuff which is best at runtime, like memory and CPU 
 limits,
 worker count.

 On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka 
 wrote:

> I would prefer to to keep it dynamic as it can be changed by the
> infrastructure or the pipeline author.
> Like in case of Python, number of concurrent bundle can be changed
> by setting pipeline option worker_count. And for Java it can be 
> computed
> based on the cpus on the machine.
>
> For Flink runner, we can use the worker_count parameter for now to
> inc

Re: Tracking and resolving release blocking bugs

2018-09-19 Thread Ismaël Mejía
We have done this so far by letting the JIRA issues 'Open' with the
'Fix version' corresponding to the upcoming release and we track the
progress between the branch cut and the first release candidate with
the assigned parties, the process has been the de-facto standard since
long time ago and has worked so far smoothly. More info here:

https://beam.apache.org/contribute/release-guide/#triage-release-blocking-issues-in-jira

Is there something missing? or do you have other ideas maybe to
improve it in mind?

On Wed, Sep 19, 2018 at 2:34 AM Connell O'Callaghan  wrote:
>
> Hi All
>
> In order to allow successful and smooth deployment of the latest BEAM 
> releases, are the community OK that we track bugs blocking releases, with a 
> goal to resolve such bugs within a week? If there is general agreement (or no 
> major objections) on this we will edit the contributor page using similar 
> language to the "Stale pull requests" section -early next week.
>
> Thank you all,
> - Connell


Rethinking Timers as PCollections

2018-09-19 Thread Robert Bradshaw
TLDR Perhaps we should revisit https://s.apache.org/beam-portability-timers
in light of the fact that Timers are more like State than PCollections.

--

While looking at implementing State and Timers in the Python SDK, I've been
revisiting the ideas presented at
https://s.apache.org/beam-portability-timers , and am now starting to
wonder if this is actually the best way to model things (at least at the
Runner level). Instead it seems Timers are more resemble, and are tightly
bound to, State than PCollections.

This is especially clear when writing timers. These timers are not a bag of
emitted elements, rather one sets (and clears) timers and the set of timers
that end up firing are a result of this *ordered* sequence of operations.
It is also often important that the setting of timers be ordered with
respect to the setting and clearing of state itself (and is more often than
not collocated with such requests).

In addition, these self-loops add complexity to the graph but provide no
additional information--they are entirely redundant with the timerspecs
already present on DoFns. Generally I prefer less redundancy in the spec,
rather than have it be over-constrained. It's unclear what a runner that
didn't introspect the DoFn's TimerSpecs would do with this these special
edges, and also unclear how they would differ from possible self-loops due
to more traditional iteration.

The primary motivation to express timers in this way seems to be the desire
to push them to workers using the data plan, rather than inventing another
mechanism or making them pull-based like with state. I think this could be
done by simply adding a Timer field to the Elements (or Data) proto. (Note
that this is not the same as having an hacky ElementOrTimer elements flow
through the graph.) Writes would be state requests, and perhaps it would
even make sense to "read" the current value of an unfired timer over the
state API, to be able to set things like
{min,max}(new_timestamp,old_timestamp}.

(We could alternatively attempt to model State(s) as a PCollection(s), but
this is more speculative and would likely exacerbate some of the issues
above (though it could open the door for DoFns that somehow *share* state).
They seem like different objects though, one is a mutable store, the other
an immutable stream.)

I realize this is a big shift, but we could probably adapt the existing
Python/Java implementations fairly easily (and it would probably simplify
them). And it's easier to do simplifications like this sooner rather than
later.

What do people think about this? Any obvious (or not-so-obvious) downsides
that I'm missing?

- Robert


Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

2018-09-19 Thread 段丁瑞
Got it.
I will also set "spark.dynamicAllocation.enabled=true" to test.


From: Tim Robertson
Date: 2018-09-19 17:04
To: dev@beam.apache.org
CC: j...@nanthrax.net
Subject: Re: Re: How to optimize the performance of Beam on Spark(Internet mail)
Thank you Devin

Can you also please try Beam with more spark executors if you are able?

On Wed, Sep 19, 2018 at 10:47 AM devinduan(段丁瑞) 
mailto:devind...@tencent.com>> wrote:
Thanks for your help!
I will test other examples of Beam On Spark in the future and then feed back 
the results.
Regards
devin

From: Jean-Baptiste Onofré
Date: 2018-09-19 16:32
To: devinduan(段丁瑞); 
dev
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)

Thanks for the details.

I will take a look later tomorrow (I have another issue to investigate
on the Spark runner today for Beam 2.7.0 release).

Regards
JB

On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> Hi,
> I test 300MB data file.
> Use command like:
> ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
>
>  I set only one exeuctor. so task run in sequence . One task cost 10s.
> However, Spark task cost only 0.4s
>
>
>
> *From:* Jean-Baptiste Onofré 
> *Date:* 2018-09-19 12:22
> *To:* dev@beam.apache.org 
> 
> *Subject:* Re: How to optimize the performance of Beam on
> Spark(Internet mail)
>
> Hi,
>
> did you compare the stages in the Spark UI in order to identify which
> stage is taking time ?
>
> You use spark-submit in both cases for the bootstrapping ?
>
> I will do a test here as well.
>
> Regards
> JB
>
> On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> > Hi,
> > Thanks for you reply.
> > Our team plan to use Beam instead of Spark, So I'm testing the
> > performance of Beam API.
> > I'm coding some example through Spark API and Beam API , like
> > "WordCount" , "Join",  "OrderBy",  "Union" ...
> > I use the same Resources and configuration to run these Job.
> >Tim said I should remove "withNumShards(1)" and
> > set spark.default.parallelism=32. I did it and tried again, but
> Beam job
> > still running very slowly.
> > Here is My Beam code and Spark code:
> >Beam "WordCount":
> >
> >Spark "WordCount":
> >
> >I will try the other example later.
> >
> > Regards
> > devin
> >
> >
> > *From:* Jean-Baptiste Onofré 
> > *Date:* 2018-09-18 22:43
> > *To:* dev@beam.apache.org 
> 
> > *Subject:* Re: How to optimize the performance of Beam on
> > Spark(Internet mail)
> >
> > Hi,
> >
> > The first huge difference is the fact that the spark runner
> still uses
> > RDD whereas directly using spark, you are using dataset. A
> bunch of
> > optimization in spark are related to dataset.
> >
> > I started a large refactoring of the spark runner to leverage
> Spark 2.x
> > (and dataset).
> > It's not yet ready as it includes other improvements (the
> portability
> > layer with Job API, a first check of state API, ...).
> >
> > Anyway, by Spark wordcount, you mean the one included in the spark
> > distribution ?
> >
> > Regards
> > JB
> >
> > On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> > > Hi,
> > > I'm testing Beam on Spark.
> > > I use spark example code WordCount processing 1G data
> file, cost 1
> > > minutes.
> > > However, I use Beam example code WordCount processing
> the same
> > file,
> > > cost 30minutes.
> > > My Spark parameter is :  --deploy-mode client
> >  --executor-memory 1g
> > > --num-executors 1 --driver-memory 1g
> > > My Spark version is 2.3.1,  Beam version is 2.5
> > > Is there any optimization method?
> > > Thank you.
> > >
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

2018-09-19 Thread Tim Robertson
Thank you Devin

Can you also please try Beam with more spark executors if you are able?

On Wed, Sep 19, 2018 at 10:47 AM devinduan(段丁瑞) 
wrote:

> Thanks for your help!
> I will test other examples of Beam On Spark in the future and then feed
> back the results.
> Regards
> devin
>
>
> *From:* Jean-Baptiste Onofré 
> *Date:* 2018-09-19 16:32
> *To:* devinduan(段丁瑞) ; dev 
> *Subject:* Re: How to optimize the performance of Beam on Spark(Internet
> mail)
>
> Thanks for the details.
>
> I will take a look later tomorrow (I have another issue to investigate
> on the Spark runner today for Beam 2.7.0 release).
>
> Regards
> JB
>
> On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> > Hi,
> > I test 300MB data file.
> > Use command like:
> > ./spark-submit --master yarn --deploy-mode client  --class
> > com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory
> 1g
> >
> >  I set only one exeuctor. so task run in sequence . One task cost 10s.
> > However, Spark task cost only 0.4s
> >
> >
> >
> > *From:* Jean-Baptiste Onofré  >
> > *Date:* 2018-09-19 12:22
> > *To:* dev@beam.apache.org  >
> > *Subject:* Re: How to optimize the performance of Beam on
> > Spark(Internet mail)
> >
> > Hi,
> >
> > did you compare the stages in the Spark UI in order to identify which
> > stage is taking time ?
> >
> > You use spark-submit in both cases for the bootstrapping ?
> >
> > I will do a test here as well.
> >
> > Regards
> > JB
> >
> > On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> > > Hi,
> > > Thanks for you reply.
> > > Our team plan to use Beam instead of Spark, So I'm testing the
> > > performance of Beam API.
> > > I'm coding some example through Spark API and Beam API , like
> > > "WordCount" , "Join",  "OrderBy",  "Union" ...
> > > I use the same Resources and configuration to run these Job.
> > >Tim said I should remove "withNumShards(1)" and
> > > set spark.default.parallelism=32. I did it and tried again, but
> > Beam job
> > > still running very slowly.
> > > Here is My Beam code and Spark code:
> > >Beam "WordCount":
> > >
> > >Spark "WordCount":
> > >
> > >I will try the other example later.
> > >
> > > Regards
> > > devin
> > >
> > >
> > > *From:* Jean-Baptiste Onofré  >
> > > *Date:* 2018-09-18 22:43
> > > *To:* dev@beam.apache.org  >
> > > *Subject:* Re: How to optimize the performance of Beam on
> > > Spark(Internet mail)
> > >
> > > Hi,
> > >
> > > The first huge difference is the fact that the spark runner
> > still uses
> > > RDD whereas directly using spark, you are using dataset. A
> > bunch of
> > > optimization in spark are related to dataset.
> > >
> > > I started a large refactoring of the spark runner to leverage
> > Spark 2.x
> > > (and dataset).
> > > It's not yet ready as it includes other improvements (the
> > portability
> > > layer with Job API, a first check of state API, ...).
> > >
> > > Anyway, by Spark wordcount, you mean the one included in the
> spark
> > > distribution ?
> > >
> > > Regards
> > > JB
> > >
> > > On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> > > > Hi,
> > > > I'm testing Beam on Spark.
> > > > I use spark example code WordCount processing 1G data
> > file, cost 1
> > > > minutes.
> > > > However, I use Beam example code WordCount processing
> > the same
> > > file,
> > > > cost 30minutes.
> > > > My Spark parameter is :  --deploy-mode client
> > >  --executor-memory 1g
> > > > --num-executors 1 --driver-memory 1g
> > > > My Spark version is 2.3.1,  Beam version is 2.5
> > > > Is there any optimization method?
> > > > Thank you.
> > > >
> > > >
> > >
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>
>


Re: Re: How to optimize the performance of Beam on Spark(Internet mail)

2018-09-19 Thread 段丁瑞
Thanks for your help!
I will test other examples of Beam On Spark in the future and then feed back 
the results.
Regards
devin

From: Jean-Baptiste Onofré
Date: 2018-09-19 16:32
To: devinduan(段丁瑞); 
dev
Subject: Re: How to optimize the performance of Beam on Spark(Internet mail)

Thanks for the details.

I will take a look later tomorrow (I have another issue to investigate
on the Spark runner today for Beam 2.7.0 release).

Regards
JB

On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> Hi,
> I test 300MB data file.
> Use command like:
> ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g
>
>  I set only one exeuctor. so task run in sequence . One task cost 10s.
> However, Spark task cost only 0.4s
>
>
>
> *From:* Jean-Baptiste Onofré 
> *Date:* 2018-09-19 12:22
> *To:* dev@beam.apache.org 
> *Subject:* Re: How to optimize the performance of Beam on
> Spark(Internet mail)
>
> Hi,
>
> did you compare the stages in the Spark UI in order to identify which
> stage is taking time ?
>
> You use spark-submit in both cases for the bootstrapping ?
>
> I will do a test here as well.
>
> Regards
> JB
>
> On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> > Hi,
> > Thanks for you reply.
> > Our team plan to use Beam instead of Spark, So I'm testing the
> > performance of Beam API.
> > I'm coding some example through Spark API and Beam API , like
> > "WordCount" , "Join",  "OrderBy",  "Union" ...
> > I use the same Resources and configuration to run these Job.
> >Tim said I should remove "withNumShards(1)" and
> > set spark.default.parallelism=32. I did it and tried again, but
> Beam job
> > still running very slowly.
> > Here is My Beam code and Spark code:
> >Beam "WordCount":
> >
> >Spark "WordCount":
> >
> >I will try the other example later.
> >
> > Regards
> > devin
> >
> >
> > *From:* Jean-Baptiste Onofré 
> > *Date:* 2018-09-18 22:43
> > *To:* dev@beam.apache.org 
> > *Subject:* Re: How to optimize the performance of Beam on
> > Spark(Internet mail)
> >
> > Hi,
> >
> > The first huge difference is the fact that the spark runner
> still uses
> > RDD whereas directly using spark, you are using dataset. A
> bunch of
> > optimization in spark are related to dataset.
> >
> > I started a large refactoring of the spark runner to leverage
> Spark 2.x
> > (and dataset).
> > It's not yet ready as it includes other improvements (the
> portability
> > layer with Job API, a first check of state API, ...).
> >
> > Anyway, by Spark wordcount, you mean the one included in the spark
> > distribution ?
> >
> > Regards
> > JB
> >
> > On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> > > Hi,
> > > I'm testing Beam on Spark.
> > > I use spark example code WordCount processing 1G data
> file, cost 1
> > > minutes.
> > > However, I use Beam example code WordCount processing
> the same
> > file,
> > > cost 30minutes.
> > > My Spark parameter is :  --deploy-mode client
> >  --executor-memory 1g
> > > --num-executors 1 --driver-memory 1g
> > > My Spark version is 2.3.1,  Beam version is 2.5
> > > Is there any optimization method?
> > > Thank you.
> > >
> > >
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com



Re: How to optimize the performance of Beam on Spark(Internet mail)

2018-09-19 Thread Jean-Baptiste Onofré
Thanks for the details.

I will take a look later tomorrow (I have another issue to investigate
on the Spark runner today for Beam 2.7.0 release).

Regards
JB

On 19/09/2018 08:31, devinduan(段丁瑞) wrote:
> Hi,
>     I test 300MB data file.
>     Use command like:
>     ./spark-submit --master yarn --deploy-mode client  --class
> com.test.BeamTest --executor-memory 1g --num-executors 1 --driver-memory 1g 
>    
>  I set only one exeuctor. so task run in sequence . One task cost 10s.
> However, Spark task cost only 0.4s
> 
> 
> 
> *From:* Jean-Baptiste Onofré 
> *Date:* 2018-09-19 12:22
> *To:* dev@beam.apache.org 
> *Subject:* Re: How to optimize the performance of Beam on
> Spark(Internet mail)
> 
> Hi,
> 
> did you compare the stages in the Spark UI in order to identify which
> stage is taking time ?
> 
> You use spark-submit in both cases for the bootstrapping ?
> 
> I will do a test here as well.
> 
> Regards
> JB
> 
> On 19/09/2018 05:34, devinduan(段丁瑞) wrote:
> > Hi,
> >     Thanks for you reply.
> >     Our team plan to use Beam instead of Spark, So I'm testing the
> > performance of Beam API.
> >     I'm coding some example through Spark API and Beam API , like
> > "WordCount" , "Join",  "OrderBy",  "Union" ...
> >     I use the same Resources and configuration to run these Job.   
> >    Tim said I should remove "withNumShards(1)" and
> > set spark.default.parallelism=32. I did it and tried again, but
> Beam job
> > still running very slowly.
> >     Here is My Beam code and Spark code:
> >    Beam "WordCount":
> >     
> >    Spark "WordCount":
> >
> >    I will try the other example later.
> >     
> > Regards
> > devin
> >
> >  
> > *From:* Jean-Baptiste Onofré 
> > *Date:* 2018-09-18 22:43
> > *To:* dev@beam.apache.org 
> > *Subject:* Re: How to optimize the performance of Beam on
> > Spark(Internet mail)
> >
> > Hi,
> >
> > The first huge difference is the fact that the spark runner
> still uses
> > RDD whereas directly using spark, you are using dataset. A
> bunch of
> > optimization in spark are related to dataset.
> >
> > I started a large refactoring of the spark runner to leverage
> Spark 2.x
> > (and dataset).
> > It's not yet ready as it includes other improvements (the
> portability
> > layer with Job API, a first check of state API, ...).
> >
> > Anyway, by Spark wordcount, you mean the one included in the spark
> > distribution ?
> >
> > Regards
> > JB
> >
> > On 18/09/2018 08:39, devinduan(段丁瑞) wrote:
> > > Hi,
> > >     I'm testing Beam on Spark. 
> > >     I use spark example code WordCount processing 1G data
> file, cost 1
> > > minutes.
> > >     However, I use Beam example code WordCount processing
> the same
> > file,
> > > cost 30minutes.
> > >     My Spark parameter is :  --deploy-mode client
> >  --executor-memory 1g
> > > --num-executors 1 --driver-memory 1g
> > >     My Spark version is 2.3.1,  Beam version is 2.5
> > >     Is there any optimization method?
> > > Thank you.
> > >
> > >    
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
> 
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


***UNCHECKED*** Re: Proposal for Beam Python User State and Timer APIs

2018-09-19 Thread Robert Bradshaw
And its implementation of the Fn API is on it's way too:

https://github.com/apache/beam/pull/6349
https://github.com/apache/beam/pull/6433


On Tue, Sep 18, 2018 at 6:56 PM Charles Chen  wrote:

> An update: the reference DirectRunner implementation of (and common
> execution code for) the Python user state and timers API has been merged:
> https://github.com/apache/beam/pull/6304
>
> On Thu, Aug 30, 2018 at 1:48 AM Charles Chen  wrote:
>
>> Another update: the reference DirectRunner implementation of the Python
>> user state and timers API is out for review:
>> https://github.com/apache/beam/pull/6304
>>
>> On Mon, Jul 9, 2018 at 2:18 PM Charles Chen  wrote:
>>
>>> An update: https://github.com/apache/beam/pull/5691 has been merged.  I
>>> hope to send out a reference implementation in the DirectRunner soon.  On
>>> the roadmap after that is work on the relevant portability interfaces here
>>> so we can get this working on runners like Beam Python on Flink.
>>>
>>> On Wed, Jun 20, 2018 at 10:00 AM Charles Chen  wrote:
>>>
 An update on the implementation: I recently sent out the user-facing
 pipeline construction part of the API implementation out for review:
 https://github.com/apache/beam/pull/5691.

 On Tue, Jun 5, 2018 at 5:26 PM Charles Chen  wrote:

> Thanks everyone for contributing here.  We've reached rough consensus
> on the approach we should take with this API, and I've summarized this in
> the new "Community consensus" sections I added to the doc (
> https://s.apache.org/beam-python-user-state-and-timers).  I will
> begin initial implementation of this API soon.
>
> On Wed, May 23, 2018 at 8:08 PM Thomas Weise  wrote:
>
>> Nice proposal; it's exciting to see this about to be added to the SDK
>> as it enables a set of more complex use cases.
>>
>> I also think that some of the content can later be repurposed as user
>> documentation.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, May 23, 2018 at 11:49 AM, Charles Chen 
>> wrote:
>>
>>> Thanks everyone for the detailed comments and discussions.  It looks
>>> like by now, we mostly agree with the requirements and overall direction
>>> needed for the API, though there is continuing discussion on specific
>>> details.  I want to highlight two new sections of the doc, which address
>>> some discussions that have come up:
>>>
>>>- *Existing state and transactionality*: this section addresses
>>>how we will address an existing transactionality inconsistency in the
>>>existing Java API.  (
>>>
>>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ofyl9jspiz3b
>>>)
>>>- *State for merging windows*: this section addresses how we
>>>will deal with non-combinable state in conjunction with merging 
>>> windows.  (
>>>
>>> https://docs.google.com/document/d/1GadEkAmtbJQjmqiqfSzGw3b66TKerm8tyn6TK4blAys/edit#heading=h.ctxkcgabtzpy
>>>)
>>>
>>> Let me know any further comments and suggestions.
>>>
>>> On Tue, May 22, 2018 at 9:29 AM Kenneth Knowles 
>>> wrote:
>>>
 Nice. I know that Java users have found it helpful to have this
 lower-level way of writing pipelines when the high-level primitives 
 don't
 quite have the tight control they are looking for. I hope it will be a 
 big
 draw for Python, too.

 (commenting on the doc)

 Kenn

 On Mon, May 21, 2018 at 5:15 PM Charles Chen 
 wrote:

> I want to share a proposal for adding user state and timer support
> to the Beam Python SDK and get the community's thoughts on how such 
> an API
> should look:
> https://s.apache.org/beam-python-user-state-and-timers
>
> Let me know what you think and please add any comments and
> suggestions you may have.
>
> Best,
> Charles
>

>>