Re: published containers overwrite locally built containers

2019-11-01 Thread Thomas Weise
More here:
https://lists.apache.org/thread.html/07131e314e229ec60100eaa2c0cf6dfc206bf2b0f78c3cee9ebb0bda@%3Cdev.beam.apache.org%3E


On Fri, Nov 1, 2019 at 10:56 AM Chamikara Jayalath 
wrote:

> I think it makes sense to override published docker images with locally
> built versions when testing HEAD.
>
> Thanks,
> Cham
>
> On Thu, Oct 31, 2019 at 6:31 PM Heejong Lee  wrote:
>
>> Hi, happy halloween!
>>
>> I'm looking into failing cross language post commit tests:
>> https://issues.apache.org/jira/browse/BEAM-8534
>> 
>>
>> After a few runs, I've found that published SDK harness containers
>> overwrite locally built containers when docker pull happens. I can think of
>> two possible solutions here: 1) remove the published images with the latest
>> tag, so make the image with the latest tag available for testing and
>> development purposes. 2) put serialVersionUID to the class printing out the
>> error.
>>
>> 2) doesn't sound like a fundamental solution if we're not going to attach
>> serialVersionUID to all serializable classes. 1) might work but I'm not
>> sure whether there's another use for the latest tag somewhere. Any better
>> ideas?
>>
>> Thanks,
>> Heejong
>>
>


Re: Python Beam pipelines on Flink on Kubernetes

2019-11-01 Thread Thomas Weise
That's a good idea. Probably best to add an example in:
https://github.com/lyft/flinkk8soperator

Do you want to add an issue?

(It will have to wait for 2.18 release though.)


On Fri, Nov 1, 2019 at 11:37 AM Chad Dombrova  wrote:

> Hi Thomas,
> Do you have an example Dockerfile demonstrating best practices for
> building an image that contains both Flink and Beam SDK dependencies?  That
> would be useful.
>
> -chad
>
>
> On Fri, Nov 1, 2019 at 10:18 AM Thomas Weise  wrote:
>
>> For folks looking to run Beam on Flink on k8s, see update in [1]
>>
>> I also updated [2]
>>
>> TLDR: at this time best option to run portable pipelines on k8s is to
>> create container images that have both Flink and the SDK dependencies.
>>
>> I'm curious how much interest there is to use the official SDK container
>> images and keep Flink and portable pipeline separate as far as the image
>> build goes? Deployment can be achieved with the sidecar container approach.
>> Most of mechanics are in place already, one addition would be an
>> abstraction for where the SDK entry point executes (process, external),
>> similar to how we have it for the workers.
>>
>> Thanks,
>> Thomas
>>
>> [1]
>> https://lists.apache.org/thread.html/10aada9c200d4300059d02e4baa47dda0cc2c6fc9432f950194a7f5e@%3Cdev.beam.apache.org%3E
>>
>> [2]
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI
>>
>> On Wed, Aug 21, 2019 at 9:44 PM Thomas Weise  wrote:
>>
>>> The changes to containerize the Python SDK worker pool are nearly
>>> complete. I also updated the document for next implementation steps.
>>>
>>> The favored approach (initially targeted option) for pipeline submission
>>> is support for the (externally created) fat far. It will keep changes to
>>> the operator to a minimum and is applicable for any other Flink job as well.
>>>
>>> Regarding fine grained resource scheduling, that would happen within the
>>> pods scheduled by the k8s operator (or other external orchestration tool)
>>> or, further down the road, in a completely elastic/dynamic fashion with
>>> active execution mode (where Flink would request resources directly from
>>> k8s, similar to how it would work on YARN).
>>>
>>>
>>> On Tue, Aug 13, 2019 at 10:59 AM Chad Dombrova 
>>> wrote:
>>>
 Hi Thomas,
 Nice work!  It's really clearly presented.

 What's the current favored approach for pipeline submission?

 I'm also interested to know how this plan overlaps (if at all) with the
 work on Fine-Grained Resource Scheduling [1][2] that's being done for Flink
 1.9+, which has implications for creation of task managers in kubernetes.

 [1]
 https://docs.google.com/document/d/1h68XOG-EyOFfcomd2N7usHK1X429pJSMiwZwAXCwx1k/edit#heading=h.72szmms7ufza
 [2] https://issues.apache.org/jira/browse/FLINK-12761

 -chad


 On Tue, Aug 13, 2019 at 9:14 AM Thomas Weise  wrote:

> There have been a few comments in the doc and I'm going to start
> working on the worker execution part.
>
> Instead of running a Docker container for each worker, the preferred
> option is to use the worker pool:
>
>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
>
> There are some notes at the end of the doc regarding the
> implementation. It would be great if those interested in the environments
> and Python docker container could take a look. In a nutshell, the proposal
> is to make few changes to the Python container so that it (optionally) can
> be used to run the worker pool.
>
> Thanks,
> Thomas
>
>
> On Wed, Jul 24, 2019 at 9:42 PM Pablo Estrada 
> wrote:
>
>> I am very happy to see this. I'll take a look, and leave my comments.
>>
>> I think this is something we'd been needing, and it's great that you
>> guys are putting thought into it. Thanks!<3
>>
>> On Wed, Jul 24, 2019 at 9:01 PM Thomas Weise  wrote:
>>
>>> Hi,
>>>
>>> Recently Lyft open sourced *FlinkK8sOperator,* a Kubernetes
>>> operator to manage Flink deployments on Kubernetes:
>>>
>>> https://github.com/lyft/flinkk8soperator/
>>>
>>> We are now discussing how to extend this operator to also support
>>> deployment of Python Beam pipelines with the Flink runner. I would like 
>>> to
>>> share the proposal with the Beam community to enlist feedback as well as
>>> explore opportunities for collaboration:
>>>
>>>
>>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/
>>>
>>> Looking forward to your comments and suggestions!
>>>
>>> Thomas
>>>
>>>


***UNCHECKED*** Re: published containers overwrite locally built containers

2019-11-01 Thread Kyle Weaver
For additional context, this was discussed weeks ago on this list:
https://lists.apache.org/thread.html/932fe0bc838b92e80475b2bf862e6cec34fbd6ac0d4f3c9de5ac25e1@%3Cdev.beam.apache.org%3E


On Fri, Nov 1, 2019 at 10:56 AM Chamikara Jayalath 
wrote:

> I think it makes sense to override published docker images with locally
> built versions when testing HEAD.
>
> Thanks,
> Cham
>
> On Thu, Oct 31, 2019 at 6:31 PM Heejong Lee  wrote:
>
>> Hi, happy halloween!
>>
>> I'm looking into failing cross language post commit tests:
>> https://issues.apache.org/jira/browse/BEAM-8534
>> 
>>
>> After a few runs, I've found that published SDK harness containers
>> overwrite locally built containers when docker pull happens. I can think of
>> two possible solutions here: 1) remove the published images with the latest
>> tag, so make the image with the latest tag available for testing and
>> development purposes. 2) put serialVersionUID to the class printing out the
>> error.
>>
>> 2) doesn't sound like a fundamental solution if we're not going to attach
>> serialVersionUID to all serializable classes. 1) might work but I'm not
>> sure whether there's another use for the latest tag somewhere. Any better
>> ideas?
>>
>> Thanks,
>> Heejong
>>
>


***UNCHECKED*** Re: published containers overwrite locally built containers

2019-11-01 Thread Heejong Lee
Since 'docker run' automatically pulls when the image doesn't exist
locally, I think it's safe to remove explicit 'docker pull' before 'docker
run'. Without 'docker pull', we won't update the local image with the
remote image (for the same tag) but it shouldn't be a problem in prod that
the unique tag is assumed for each released version.

On Fri, Nov 1, 2019 at 10:56 AM Chamikara Jayalath 
wrote:

> I think it makes sense to override published docker images with locally
> built versions when testing HEAD.
>
> Thanks,
> Cham
>
> On Thu, Oct 31, 2019 at 6:31 PM Heejong Lee  wrote:
>
>> Hi, happy halloween!
>>
>> I'm looking into failing cross language post commit tests:
>> https://issues.apache.org/jira/browse/BEAM-8534
>> 
>>
>> After a few runs, I've found that published SDK harness containers
>> overwrite locally built containers when docker pull happens. I can think of
>> two possible solutions here: 1) remove the published images with the latest
>> tag, so make the image with the latest tag available for testing and
>> development purposes. 2) put serialVersionUID to the class printing out the
>> error.
>>
>> 2) doesn't sound like a fundamental solution if we're not going to attach
>> serialVersionUID to all serializable classes. 1) might work but I'm not
>> sure whether there's another use for the latest tag somewhere. Any better
>> ideas?
>>
>> Thanks,
>> Heejong
>>
>


Re: aggregating over triggered results

2019-11-01 Thread Robert Bradshaw
On Thu, Oct 31, 2019 at 8:48 PM Aaron Dixon  wrote:
>
> First of all thank you for taking the time on this very clear and helpful 
> message. Much appreciated.
>
> >I suppose one could avoid doing any pre-aggregation, and emit all of
> the events (with reified timestamp) in 60/30-day windows, then have a
> DoFn that filters on the events and computes each of the 10-minute
> aggregates over the "true" sliding window (4320 outputs). This could
> be cheaper if your events are very sparse, will be more expensive if
> they're very dense, and it's unclear what the tradeoff will be.
>
> This is exactly what I was doing (trying to do), reify the events and filter 
> them out to compute my own desired window for the trigger. I have lots of 
> events but each key has few events (in the thousands) but I think your point 
> is that even this is not a win, the events overall would have to be quite 
> sparse for it to be a win and by how much. So I can see why this is perhaps 
> not a great thread to pursue.
>
> On another note, trying to use *periodic* triggers like this in 
> *intermediate* pipeline stages and leverage them in downstream aggregations 
> was something I was trying to do here and in a few other cases. (I'm new to 
> Beam and triggers seemed fundamental so I expected to not get so lost trying 
> to use them this way.) But at least at this stage of my understanding I think 
> this was misplaced... periodic triggers seem primarily important say at the 
> last stage of a pipeline where you may be writing updates to an actual 
> sink/table.
>
> In other words suppose the above (60/30 day sliding) approach turned out to 
> be more efficient... I still have no idea if, using Beam, I'd be able to 
> properly regroup on the other side and pick out all the "latest triggered" 
> events from the rest... or even know when I've got them. This was the source 
> of my original question, but I'm now just thinking this is just not what 
> people do in Beam pipelines... periodically trigger windows _in the middle_ 
> of a pipeline. Am I on the right track in this thinking? If so, I wonder if 
> the API would better reflect this? If it's a doomed strategy to try to 
> periodically trigger 'into' downstream aggregations, why is the API so 
> friendly to doing just this?

Yes, see e.g. 
https://docs.google.com/document/d/17H2sBEtnoTSxjzlrz7rmKtX5E3F0mW1NpFQzWzSYOpY
. As an intermediate point (and stepping stone) we want to at least
have retractions:
https://docs.google.com/document/d/14WRfxwk_iLUHGPty3C6ZenddPsp_d6jhmx0vuafXqmE
. Triggering is an advanced, and somewhat thorny (and not as fleshed
out) concept (e.g. it introduced non-determinism). It's basically
trying to solve them problem of seeing versions of aggregations that
are not gated by the watermark (either early, before the watermark has
declared that you've seen all the data, or late, in case the watermark
was wrong (watermarks can be heuristic as perfect certainty might be
to slow/expensive)).

> On Wed, Oct 30, 2019 at 5:37 PM Robert Bradshaw  wrote:
>>
>> On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon  wrote:
>> >
>> > Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and 
>> > meant to hit user@, but as we're here could you clarify your two points, 
>> > however--
>>
>> No problem. This is veering into dev@ territory anyway :).
>>
>> > 1) I am under the impression that the 4,000 sliding windows approach (30 
>> > days every 10m) will re-evaluate my combine aggregation every 10m whereas 
>> > with the two-window approach my Combine aggregation would evolve 
>> > iteratively, only merging new results into the aggregation.
>> >
>> > If there's a cross-window optimization occurring that would allow 
>> > iterative combining _across windows_, given the substantial order of 
>> > magnitude difference in scale at play, is it safe to consider such 
>> > 'internal optimization detail' part of the platform contract (Dataflow's, 
>> > say)? Otherwise it would be hard to lean on this from a production system 
>> > that will live into the future.
>>
>> OK, let's first define exactly what (I think) you're trying to
>> compute. Let's label windows by their (upper) endpoint. So, every 10
>> minutes you have a window W_t and an aggregate Aggr(e for e in
>> all_events if t - 60days <= timestamp(e) < t).
>>
>> The way this is computed in Beam is by storing a map W_t ->
>> RunningAggregate and whenever we see an element with timestamp T we
>> assign it to the set of windows S = {W_t : T in W_t} (in this case
>> there would be 30*24*6 = 4320 of them) and subsequently update all the
>> running aggregates. When we are sure we've seen all elements up to t
>> (the watermark) we release window W_t with its computed aggregate
>> downstream.
>>
>> An alternative that's often proposed, and works only for aligned
>> sliding windows, is to instead store a map of 10-minute buckets to
>> running aggregates, and whenever an element comes in we add its value
>> to the aggregate 

Re: Python SDK timestamp precision

2019-11-01 Thread Robert Bradshaw
On Fri, Nov 1, 2019 at 2:17 AM Jan Lukavský  wrote:
>
>  > Yes, this is the "minus epsilon" idea, but assigning this as a bit on
> the WindowedValue rather than on the Timestamp itself. This means that
> pulling the timestamp out then re-assigning it would be lossy. (As a
> basic example, imaging the batching DoFn that batches up elements (with
> their respective metadata), calls an external service on the full batch,
> and then emits the results (with the appropriate, cached, metadata).
>
> I'm not sure it I follow - I'd say that in the example the "limit bit"
> could be part of the metadata, so that if the external service returns
> response, the limiting bit could be be added back. Actually a PTransform
> for manipulating of this limit bit should be accessible by users,
> because it can be useful when reading data from external service (e.g.
> kafka).
>
> Yes, the current approach of subtracting 1 (millisecond, nanosecond,
> picosecond, whatever) ensures that we don't have to hold this metadata,
> because it is part of the timestamp. But we are not modeling the reality
> correctly. In reality the output for a window with range
> <02:00:00,03:00:00) (half-closed) cannot happen *before* 03:00:00. Yes,
> it's validity interval might depend on a precision -
> <02:00:00-02:59:59.999> in case of milliseconds
> <02:00:00-02:59:59.9> in case of nanoseconds - that is only due
> to numerical limitations. Neither of that is correct and that is what
> brings this timestamp precision into question. This only arises from the
> fact, that we cannot preserve causality of events with the same
> (numerical value of) timestamp correctly. That's why I'd say if would be
> better to actually fix how we treat the reality. Whether it would be by
> creating Beam's Timestamp with this ability, or adding this to
> WindowedValue along with the other metadata (timestamp, window, pane,
> ...) is an implementation detail. Still there is a question if we can
> actually do that, because that would affect output timestamps of user
> transforms (correct them actually, but even though it is a breaking change).

This is point (3) above--I think it would be very surprising if
extracting and then immediately setting the timestamp is anything but
a no-op. If native libraries (like Java Instant) had a notion of
minus-epsilon than I would want to use it.

> On 10/31/19 6:07 PM, Robert Bradshaw wrote:
> > On Thu, Oct 31, 2019 at 1:49 AM Jan Lukavský  wrote:
> >
> >>   > This is quite an interesting idea. In some sense, timestamps become
> >> like interval windows, and window assignment is similar to the window
> >> mapping fns that we use for side inputs. I still think the idea of a
> >> timestmap for an element and a window for an element is needed (e.g. one
> >> can have elements in a single window, especially the global window that
> >> have different timestamps), but this could be interesting to explore. It
> >> could definitely get rid of the "minus epsilon" weirdness, though I
> >> don't think it completely solves the granularity issues.
> >>
> >> Thinking about this a little more - maybe we actually don't need a time
> >> interval (not sure), it might be sufficient to actually add a single bit
> >> to the WindowedValue. That bit would be "the timestamp it PRECISELY this
> >> one" or "the timestamp is limiting value". This flag would have to
> >> propagate to timer settings (that is "set timer to exactly timestamp T"
> >> or "set timer as close to T as possible"). Then window timers at the end
> >> of window would set timers with this "limiting" setting (note that
> >> window.maxTimestamp() would have to change definition to be the first
> >> timestamp strictly greater than any timestamp that belongs to the window
> >> - it will actually be the first timestamp of new window with "limit"
> >> flag on). The impact on timers would be that events fired from @OnTimer
> >> would just propagate the flag to the WindowedValue being output.
> >>
> >> That way it seems to not matter how SDK internally handles time
> >> precision, as it would be transparent (at least seems to me). It is
> >> actually precisely what you proposed as "minus epsilon", only taken to
> >> the extreme. Looks useful to me and seems not that hard to implement.
> >> Although it would be of course a somewhat breaking change, because
> >> outputs of windows would become "3:00:00.000" instead of "2:59:59.999"
> >> (but I like the first one much more! :))
> > Yes, this is the "minus epsilon" idea, but assigning this as a bit on
> > the WindowedValue rather than on the Timestamp itself. This means that
> > pulling the timestamp out then re-assigning it would be lossy. (As a
> > basic example, imaging the batching DoFn that batches up elements
> > (with their respective metadata), calls an external service on the
> > full batch, and then emits the results (with the appropriate, cached,
> > metadata).
> >
> >> On 10/30/19 10:32 PM, Robert Bradshaw wrote:
> >>> On Wed, Oct 30, 

Re: Python Beam pipelines on Flink on Kubernetes

2019-11-01 Thread Chad Dombrova
Hi Thomas,
Do you have an example Dockerfile demonstrating best practices for building
an image that contains both Flink and Beam SDK dependencies?  That would be
useful.

-chad


On Fri, Nov 1, 2019 at 10:18 AM Thomas Weise  wrote:

> For folks looking to run Beam on Flink on k8s, see update in [1]
>
> I also updated [2]
>
> TLDR: at this time best option to run portable pipelines on k8s is to
> create container images that have both Flink and the SDK dependencies.
>
> I'm curious how much interest there is to use the official SDK container
> images and keep Flink and portable pipeline separate as far as the image
> build goes? Deployment can be achieved with the sidecar container approach.
> Most of mechanics are in place already, one addition would be an
> abstraction for where the SDK entry point executes (process, external),
> similar to how we have it for the workers.
>
> Thanks,
> Thomas
>
> [1]
> https://lists.apache.org/thread.html/10aada9c200d4300059d02e4baa47dda0cc2c6fc9432f950194a7f5e@%3Cdev.beam.apache.org%3E
>
> [2]
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI
>
> On Wed, Aug 21, 2019 at 9:44 PM Thomas Weise  wrote:
>
>> The changes to containerize the Python SDK worker pool are nearly
>> complete. I also updated the document for next implementation steps.
>>
>> The favored approach (initially targeted option) for pipeline submission
>> is support for the (externally created) fat far. It will keep changes to
>> the operator to a minimum and is applicable for any other Flink job as well.
>>
>> Regarding fine grained resource scheduling, that would happen within the
>> pods scheduled by the k8s operator (or other external orchestration tool)
>> or, further down the road, in a completely elastic/dynamic fashion with
>> active execution mode (where Flink would request resources directly from
>> k8s, similar to how it would work on YARN).
>>
>>
>> On Tue, Aug 13, 2019 at 10:59 AM Chad Dombrova  wrote:
>>
>>> Hi Thomas,
>>> Nice work!  It's really clearly presented.
>>>
>>> What's the current favored approach for pipeline submission?
>>>
>>> I'm also interested to know how this plan overlaps (if at all) with the
>>> work on Fine-Grained Resource Scheduling [1][2] that's being done for Flink
>>> 1.9+, which has implications for creation of task managers in kubernetes.
>>>
>>> [1]
>>> https://docs.google.com/document/d/1h68XOG-EyOFfcomd2N7usHK1X429pJSMiwZwAXCwx1k/edit#heading=h.72szmms7ufza
>>> [2] https://issues.apache.org/jira/browse/FLINK-12761
>>>
>>> -chad
>>>
>>>
>>> On Tue, Aug 13, 2019 at 9:14 AM Thomas Weise  wrote:
>>>
 There have been a few comments in the doc and I'm going to start
 working on the worker execution part.

 Instead of running a Docker container for each worker, the preferred
 option is to use the worker pool:


 https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898

 There are some notes at the end of the doc regarding the
 implementation. It would be great if those interested in the environments
 and Python docker container could take a look. In a nutshell, the proposal
 is to make few changes to the Python container so that it (optionally) can
 be used to run the worker pool.

 Thanks,
 Thomas


 On Wed, Jul 24, 2019 at 9:42 PM Pablo Estrada 
 wrote:

> I am very happy to see this. I'll take a look, and leave my comments.
>
> I think this is something we'd been needing, and it's great that you
> guys are putting thought into it. Thanks!<3
>
> On Wed, Jul 24, 2019 at 9:01 PM Thomas Weise  wrote:
>
>> Hi,
>>
>> Recently Lyft open sourced *FlinkK8sOperator,* a Kubernetes operator
>> to manage Flink deployments on Kubernetes:
>>
>> https://github.com/lyft/flinkk8soperator/
>>
>> We are now discussing how to extend this operator to also support
>> deployment of Python Beam pipelines with the Flink runner. I would like 
>> to
>> share the proposal with the Beam community to enlist feedback as well as
>> explore opportunities for collaboration:
>>
>>
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/
>>
>> Looking forward to your comments and suggestions!
>>
>> Thomas
>>
>>


Re: Proposal: Dynamic timer support (BEAM-6857)

2019-11-01 Thread Reuven Lax
Hi Jan,

Your proposal has merit, but I think using the TimerFamily specification is
more consistent with the existing API. I think that a StateFamily can also
have domains just like timers.

Luke's suggestion for the proto changes sound good.

Reuven

On Tue, Oct 29, 2019 at 2:43 AM Jan Lukavský  wrote:

> Hi Reuven,
>
> I didn't propose to restrict the model. Model can (and should have)
> multiple timers per key and even dynamic. The question was if this can be
> made efficiently by using single timer (after all, the runner will probably
> have single "timer service" so no matter what we expose on the API level,
> this will end up being multiplexed in the runner). And it might have
> additional benefits of preventing bugs. But I'm not proposing to do this
> change for existing timers, that was more a question about if we really
> must force runners to be able to implement dynamic timers or we can do it
> on the translation layer generally for all runners at once.
>
> Regarding the API - which is again independent question of how it will be
> implemented - what do we need the @TimerFamily TimerSpec declaration for? I
> see two reasons:
>
>  a) it holds the time domain
>
>  b) it declares the DoFn as being stateful
>
> Property a) looks like it can be specified when setting the timer. b)
> could be inferred from @ProcessElement (or other method). What about
> class MyDoFn extends DoFn {
>   @ProcessElement
>   // declares @TimerContext which implies stateful DoFn
>   public void process(@Element String e, @TimerContext TimerContext timers))
> {
> Timer timer1 = timers.get("timer1", EVENT_TIME);
> Timer timer2 = timers.get("timer2", PROCESSING_TIME);
> timer1.set(...);
> timer2.set(...);
>   }
>
>   // empty name might be allowed iff the declaration contains
> @TimerContext, so that declares using dynamic timers
>   @OnTimer public void onTimer(@TimerId String timerFired, @Timestamp
> Instant timerTs, @TimerContext TimerContext timers) { ... }
> }
>
> I'm still seeking the analogy with dynamic state, because in this API,
> that might become
>
> class MyDoFn extends DoFn {
>   @ProcessElement
>   public void process(@Element String e, @StateContext StateContext
> states)) {
> ValueState state = states.get("myDynamicState", StateSpec...);
> state.get(...)
> state.set(...)
>   }
> }
>
> The point is that there seems to be no use for any declaration like
> @TimerFamily in case of dynamic state, because there is no domain. It would
> feel weird to have to declare something for dynamic timers and not have to
> do it for state.
>
> Jan
>
> On 10/29/19 6:56 AM, Reuven Lax wrote:
>
> Just to circle back around, after the discussion on this thread I propose
> modifying the proposed API as follows:
>
> class MyDoFn extends DoFn {
>   @TimerFamily("timers") TimerSpec timers =
> TimerSpecs.timerFamily(TimeDomain(EVENT_TIME));
>
>   @ProcessElement
>   public void process(@Element String e, @TimerFamily("timers") TimerMap
> timers)) {
> timers.set("timer1", ...);
> timers.set("timer2", ...);
>   }
>
>   @OnTimer("timer") public void onTimer(@TimerId String timerFired,
> @Timestamp Instant timerTs, @TimerFamily("timers") TimerMap timers) { ...
> }
> }
>
> Discussions around exposing DoFnSignature and DoFnInvoker to DSL authors
> are a bit independent (though not completely so, as it does relate), so I
> suggest splitting that into a separate discussion.
>
> Reuven
>
> On Mon, Oct 28, 2019 at 10:52 PM Reuven Lax  wrote:
>
>>
>>
>> On Wed, Oct 23, 2019 at 1:21 AM Jan Lukavský  wrote:
>>
>>> Hi Reuven,
>>>
>>> yes, if this change is intended to be used by end users, then
>>> DoFnSignatures cannot be used, agree on that. Regarding the relationship
>>> with dynamic state - I agree that this is separate problem, but because it
>>> is close enough, we should know how we want to deal with that. Because
>>> state and timers share some functionality (after all timers need state to
>>> be fault tolerant), these API should IMO share the same logic. Whatever
>>> solution chosen to expose dynamic timers, it should extend to dynamic state.
>>>
>>> I'd like to stop a little with the premise that users want dynamic
>>> timers (that is timers whose *name* - and therefore behavior - is
>>> determined by incoming data). Could this case be modeled so that the timer
>>> actually has one more (implicit) state variable that actually holds
>>> collection of tuples (timestamp, name)? Then the timer would be invoked at
>>> given (minimum of all currently set) timestamps with respective name? The
>>> question here probably is - can this have performance impact? That is to
>>> say - can any runner actually do anything different from this in the sense
>>> of time complexity of the algorithm?
>>>
>>
>> Yes - you could always multiplex many timers one one. This is what some
>> users do today, but it tends to be very inefficient and also complex. The
>> Beam model requires runners to support dynamic timers per key 

Re: Strict timer ordering in Samza and Portable Flink Runners

2019-11-01 Thread Reuven Lax
FYI Dataflow is working on adding support for TestStream. The fact that
these tests don't expose such problems on Dataflow is evidence that
TestStream support is needed.

Reuven

On Fri, Nov 1, 2019 at 10:21 AM Kenneth Knowles  wrote:

> Indeed. Thanks for looking through all the runners support for this.
>
> I have reproduced it and filed
> https://issues.apache.org/jira/browse/BEAM-8543.
>
> The Dataflow integration tests are slow and expensive so I don't want to
> add a new test suite right now. We have internal coverage of this, just at
> a delay.
>
> Kenn
>
> On Fri, Nov 1, 2019 at 1:51 AM Jan Lukavský  wrote:
>
>> Okay, that makes sense. I'm not sure how to fix this, though. Can I
>> suppose that someone from Dataflow team will take care of that?
>> On 11/1/19 12:16 AM, Kenneth Knowles wrote:
>>
>> It is because Dataflow does not support TestStream, so one test is
>> disabled, and because the other test has only bounded inputs it is run in
>> batch mode. In this case we need to do either: force streaming mode on
>> Dataflow or have an unbounded input. We used to run two Validates Runner
>> suites, where one of them is forced to streaming mode for all tests. We
>> really would like to also run that, actually. I don't see it anymore.
>>
>> Kenn
>>
>> On Thu, Oct 31, 2019 at 10:43 AM Jan Lukavský  wrote:
>>
>>> That is quite strange. The timer ordering tests were quite stable on
>>> DirectRunner. Prior to the fix it failed consistently. Dataflow on the
>>> other hand seems to consistently pass.
>>> On 10/31/19 6:28 PM, Kenneth Knowles wrote:
>>>
>>> Hmm, classical Dataflow should fail.
>>>
>>>  - all user timers in a bundle processed first:
>>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L353
>>>  - processed in a loop that drains the StepContext:
>>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L451
>>>  - the context just feeds the iterable for the current bundle (no
>>> priority queue of newly set timers):
>>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java#L550
>>>
>>> Looks like we need some more tests.
>>>
>>> Kenn
>>>
>>> On Thu, Oct 31, 2019 at 10:06 AM Jan Lukavský  wrote:
>>>
 Hi,

 just today I noticed failures on portable dataflow [1] [2]. "Classical"
 dataflow seems to pass.

 Jan

 [1] https://issues.apache.org/jira/browse/BEAM-8530

 [2] https://github.com/apache/beam/pull/9951
 On 10/31/19 5:29 PM, Reuven Lax wrote:

 Have you seen these failures on Dataflow as well? From code examination
 I would expect Dataflow to have some bugs in this area as well (especially
 if a timer is set while processing a bundle). If the tests are passing on
 Dataflow this might mean that we need different tests (or it might mean
 that Dataflow is "working" for some mysterious reason that is not obvious
 from the code :) ).

 On Wed, Oct 23, 2019 at 2:54 AM Jan Lukavský  wrote:

> Hi,
>
> as part of [1] a new set of validatesRunner tests has been introduced.
> These tests (currently marked as category UsesStrictTimerOrdering)
> verify that runners fire timers in increasing timestamp under all
> circumstances. After adding these validatesRunner tests, Samza [2] and
> Portable Flink [3] started to fail these tests. I have created the
> tracking issues for that, because that behavior should be fixed
> (timers
> in wrong order can cause erratic behavior and/or data loss).
>
> I'm writing to anyone interested in solving these issues.
>
> Cheers,
>
>   Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7520
>
> [2] https://issues.apache.org/jira/browse/BEAM-8459
>
> [3] https://issues.apache.org/jira/browse/BEAM-8460
>
>


Re: published containers overwrite locally built containers

2019-11-01 Thread Chamikara Jayalath
I think it makes sense to override published docker images with locally
built versions when testing HEAD.

Thanks,
Cham

On Thu, Oct 31, 2019 at 6:31 PM Heejong Lee  wrote:

> Hi, happy halloween!
>
> I'm looking into failing cross language post commit tests:
> https://issues.apache.org/jira/browse/BEAM-8534
> 
>
> After a few runs, I've found that published SDK harness containers
> overwrite locally built containers when docker pull happens. I can think of
> two possible solutions here: 1) remove the published images with the latest
> tag, so make the image with the latest tag available for testing and
> development purposes. 2) put serialVersionUID to the class printing out the
> error.
>
> 2) doesn't sound like a fundamental solution if we're not going to attach
> serialVersionUID to all serializable classes. 1) might work but I'm not
> sure whether there's another use for the latest tag somewhere. Any better
> ideas?
>
> Thanks,
> Heejong
>


Re: Strict timer ordering in Samza and Portable Flink Runners

2019-11-01 Thread Kenneth Knowles
Indeed. Thanks for looking through all the runners support for this.

I have reproduced it and filed
https://issues.apache.org/jira/browse/BEAM-8543.

The Dataflow integration tests are slow and expensive so I don't want to
add a new test suite right now. We have internal coverage of this, just at
a delay.

Kenn

On Fri, Nov 1, 2019 at 1:51 AM Jan Lukavský  wrote:

> Okay, that makes sense. I'm not sure how to fix this, though. Can I
> suppose that someone from Dataflow team will take care of that?
> On 11/1/19 12:16 AM, Kenneth Knowles wrote:
>
> It is because Dataflow does not support TestStream, so one test is
> disabled, and because the other test has only bounded inputs it is run in
> batch mode. In this case we need to do either: force streaming mode on
> Dataflow or have an unbounded input. We used to run two Validates Runner
> suites, where one of them is forced to streaming mode for all tests. We
> really would like to also run that, actually. I don't see it anymore.
>
> Kenn
>
> On Thu, Oct 31, 2019 at 10:43 AM Jan Lukavský  wrote:
>
>> That is quite strange. The timer ordering tests were quite stable on
>> DirectRunner. Prior to the fix it failed consistently. Dataflow on the
>> other hand seems to consistently pass.
>> On 10/31/19 6:28 PM, Kenneth Knowles wrote:
>>
>> Hmm, classical Dataflow should fail.
>>
>>  - all user timers in a bundle processed first:
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L353
>>  - processed in a loop that drains the StepContext:
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L451
>>  - the context just feeds the iterable for the current bundle (no
>> priority queue of newly set timers):
>> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java#L550
>>
>> Looks like we need some more tests.
>>
>> Kenn
>>
>> On Thu, Oct 31, 2019 at 10:06 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> just today I noticed failures on portable dataflow [1] [2]. "Classical"
>>> dataflow seems to pass.
>>>
>>> Jan
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-8530
>>>
>>> [2] https://github.com/apache/beam/pull/9951
>>> On 10/31/19 5:29 PM, Reuven Lax wrote:
>>>
>>> Have you seen these failures on Dataflow as well? From code examination
>>> I would expect Dataflow to have some bugs in this area as well (especially
>>> if a timer is set while processing a bundle). If the tests are passing on
>>> Dataflow this might mean that we need different tests (or it might mean
>>> that Dataflow is "working" for some mysterious reason that is not obvious
>>> from the code :) ).
>>>
>>> On Wed, Oct 23, 2019 at 2:54 AM Jan Lukavský  wrote:
>>>
 Hi,

 as part of [1] a new set of validatesRunner tests has been introduced.
 These tests (currently marked as category UsesStrictTimerOrdering)
 verify that runners fire timers in increasing timestamp under all
 circumstances. After adding these validatesRunner tests, Samza [2] and
 Portable Flink [3] started to fail these tests. I have created the
 tracking issues for that, because that behavior should be fixed (timers
 in wrong order can cause erratic behavior and/or data loss).

 I'm writing to anyone interested in solving these issues.

 Cheers,

   Jan

 [1] https://issues.apache.org/jira/browse/BEAM-7520

 [2] https://issues.apache.org/jira/browse/BEAM-8459

 [3] https://issues.apache.org/jira/browse/BEAM-8460




Re: Python Beam pipelines on Flink on Kubernetes

2019-11-01 Thread Thomas Weise
For folks looking to run Beam on Flink on k8s, see update in [1]

I also updated [2]

TLDR: at this time best option to run portable pipelines on k8s is to
create container images that have both Flink and the SDK dependencies.

I'm curious how much interest there is to use the official SDK container
images and keep Flink and portable pipeline separate as far as the image
build goes? Deployment can be achieved with the sidecar container approach.
Most of mechanics are in place already, one addition would be an
abstraction for where the SDK entry point executes (process, external),
similar to how we have it for the workers.

Thanks,
Thomas

[1]
https://lists.apache.org/thread.html/10aada9c200d4300059d02e4baa47dda0cc2c6fc9432f950194a7f5e@%3Cdev.beam.apache.org%3E

[2]
https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI

On Wed, Aug 21, 2019 at 9:44 PM Thomas Weise  wrote:

> The changes to containerize the Python SDK worker pool are nearly
> complete. I also updated the document for next implementation steps.
>
> The favored approach (initially targeted option) for pipeline submission
> is support for the (externally created) fat far. It will keep changes to
> the operator to a minimum and is applicable for any other Flink job as well.
>
> Regarding fine grained resource scheduling, that would happen within the
> pods scheduled by the k8s operator (or other external orchestration tool)
> or, further down the road, in a completely elastic/dynamic fashion with
> active execution mode (where Flink would request resources directly from
> k8s, similar to how it would work on YARN).
>
>
> On Tue, Aug 13, 2019 at 10:59 AM Chad Dombrova  wrote:
>
>> Hi Thomas,
>> Nice work!  It's really clearly presented.
>>
>> What's the current favored approach for pipeline submission?
>>
>> I'm also interested to know how this plan overlaps (if at all) with the
>> work on Fine-Grained Resource Scheduling [1][2] that's being done for Flink
>> 1.9+, which has implications for creation of task managers in kubernetes.
>>
>> [1]
>> https://docs.google.com/document/d/1h68XOG-EyOFfcomd2N7usHK1X429pJSMiwZwAXCwx1k/edit#heading=h.72szmms7ufza
>> [2] https://issues.apache.org/jira/browse/FLINK-12761
>>
>> -chad
>>
>>
>> On Tue, Aug 13, 2019 at 9:14 AM Thomas Weise  wrote:
>>
>>> There have been a few comments in the doc and I'm going to start working
>>> on the worker execution part.
>>>
>>> Instead of running a Docker container for each worker, the preferred
>>> option is to use the worker pool:
>>>
>>>
>>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.bzs6y6ms0898
>>>
>>> There are some notes at the end of the doc regarding the implementation.
>>> It would be great if those interested in the environments and Python docker
>>> container could take a look. In a nutshell, the proposal is to make few
>>> changes to the Python container so that it (optionally) can be used to run
>>> the worker pool.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Wed, Jul 24, 2019 at 9:42 PM Pablo Estrada 
>>> wrote:
>>>
 I am very happy to see this. I'll take a look, and leave my comments.

 I think this is something we'd been needing, and it's great that you
 guys are putting thought into it. Thanks!<3

 On Wed, Jul 24, 2019 at 9:01 PM Thomas Weise  wrote:

> Hi,
>
> Recently Lyft open sourced *FlinkK8sOperator,* a Kubernetes operator
> to manage Flink deployments on Kubernetes:
>
> https://github.com/lyft/flinkk8soperator/
>
> We are now discussing how to extend this operator to also support
> deployment of Python Beam pipelines with the Flink runner. I would like to
> share the proposal with the Beam community to enlist feedback as well as
> explore opportunities for collaboration:
>
>
> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/
>
> Looking forward to your comments and suggestions!
>
> Thomas
>
>


Re: Python SDK timestamp precision

2019-11-01 Thread Jan Lukavský
> Yes, this is the "minus epsilon" idea, but assigning this as a bit on 
the WindowedValue rather than on the Timestamp itself. This means that 
pulling the timestamp out then re-assigning it would be lossy. (As a 
basic example, imaging the batching DoFn that batches up elements (with 
their respective metadata), calls an external service on the full batch, 
and then emits the results (with the appropriate, cached, metadata).


I'm not sure it I follow - I'd say that in the example the "limit bit" 
could be part of the metadata, so that if the external service returns 
response, the limiting bit could be be added back. Actually a PTransform 
for manipulating of this limit bit should be accessible by users, 
because it can be useful when reading data from external service (e.g. 
kafka).


Yes, the current approach of subtracting 1 (millisecond, nanosecond, 
picosecond, whatever) ensures that we don't have to hold this metadata, 
because it is part of the timestamp. But we are not modeling the reality 
correctly. In reality the output for a window with range 
<02:00:00,03:00:00) (half-closed) cannot happen *before* 03:00:00. Yes, 
it's validity interval might depend on a precision - 
<02:00:00-02:59:59.999> in case of milliseconds 
<02:00:00-02:59:59.9> in case of nanoseconds - that is only due 
to numerical limitations. Neither of that is correct and that is what 
brings this timestamp precision into question. This only arises from the 
fact, that we cannot preserve causality of events with the same 
(numerical value of) timestamp correctly. That's why I'd say if would be 
better to actually fix how we treat the reality. Whether it would be by 
creating Beam's Timestamp with this ability, or adding this to 
WindowedValue along with the other metadata (timestamp, window, pane, 
...) is an implementation detail. Still there is a question if we can 
actually do that, because that would affect output timestamps of user 
transforms (correct them actually, but even though it is a breaking change).


Jan

On 10/31/19 6:07 PM, Robert Bradshaw wrote:

On Thu, Oct 31, 2019 at 1:49 AM Jan Lukavský  wrote:


  > This is quite an interesting idea. In some sense, timestamps become
like interval windows, and window assignment is similar to the window
mapping fns that we use for side inputs. I still think the idea of a
timestmap for an element and a window for an element is needed (e.g. one
can have elements in a single window, especially the global window that
have different timestamps), but this could be interesting to explore. It
could definitely get rid of the "minus epsilon" weirdness, though I
don't think it completely solves the granularity issues.

Thinking about this a little more - maybe we actually don't need a time
interval (not sure), it might be sufficient to actually add a single bit
to the WindowedValue. That bit would be "the timestamp it PRECISELY this
one" or "the timestamp is limiting value". This flag would have to
propagate to timer settings (that is "set timer to exactly timestamp T"
or "set timer as close to T as possible"). Then window timers at the end
of window would set timers with this "limiting" setting (note that
window.maxTimestamp() would have to change definition to be the first
timestamp strictly greater than any timestamp that belongs to the window
- it will actually be the first timestamp of new window with "limit"
flag on). The impact on timers would be that events fired from @OnTimer
would just propagate the flag to the WindowedValue being output.

That way it seems to not matter how SDK internally handles time
precision, as it would be transparent (at least seems to me). It is
actually precisely what you proposed as "minus epsilon", only taken to
the extreme. Looks useful to me and seems not that hard to implement.
Although it would be of course a somewhat breaking change, because
outputs of windows would become "3:00:00.000" instead of "2:59:59.999"
(but I like the first one much more! :))

Yes, this is the "minus epsilon" idea, but assigning this as a bit on
the WindowedValue rather than on the Timestamp itself. This means that
pulling the timestamp out then re-assigning it would be lossy. (As a
basic example, imaging the batching DoFn that batches up elements
(with their respective metadata), calls an external service on the
full batch, and then emits the results (with the appropriate, cached,
metadata).


On 10/30/19 10:32 PM, Robert Bradshaw wrote:

On Wed, Oct 30, 2019 at 2:00 AM Jan Lukavský  wrote:

TL;DR - can we solve this by representing aggregations as not point-wise
events in time, but time ranges? Explanation below.

Hi,

this is pretty interesting from a theoretical point of view. The
question generally seems to be - having two events, can I reliably order
them? One event might be end of one window and the other event might be
start of another window. There is strictly required causality in this
(although these events in fact have the same limiting 

Re: Strict timer ordering in Samza and Portable Flink Runners

2019-11-01 Thread Jan Lukavský
Okay, that makes sense. I'm not sure how to fix this, though. Can I 
suppose that someone from Dataflow team will take care of that?


On 11/1/19 12:16 AM, Kenneth Knowles wrote:
It is because Dataflow does not support TestStream, so one test is 
disabled, and because the other test has only bounded inputs it is run 
in batch mode. In this case we need to do either: force streaming mode 
on Dataflow or have an unbounded input. We used to run two Validates 
Runner suites, where one of them is forced to streaming mode for all 
tests. We really would like to also run that, actually. I don't see it 
anymore.


Kenn

On Thu, Oct 31, 2019 at 10:43 AM Jan Lukavský > wrote:


That is quite strange. The timer ordering tests were quite stable
on DirectRunner. Prior to the fix it failed consistently. Dataflow
on the other hand seems to consistently pass.

On 10/31/19 6:28 PM, Kenneth Knowles wrote:

Hmm, classical Dataflow should fail.

 - all user timers in a bundle processed first:

https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L353
 - processed in a loop that drains the StepContext:

https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L451
 - the context just feeds the iterable for the current bundle (no
priority queue of newly set timers):

https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java#L550

Looks like we need some more tests.

Kenn

On Thu, Oct 31, 2019 at 10:06 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Hi,

just today I noticed failures on portable dataflow [1] [2].
"Classical" dataflow seems to pass.

Jan

[1] https://issues.apache.org/jira/browse/BEAM-8530

[2] https://github.com/apache/beam/pull/9951

On 10/31/19 5:29 PM, Reuven Lax wrote:

Have you seen these failures on Dataflow as well? From code
examination I would expect Dataflow to have some bugs in
this area as well (especially if a timer is set while
processing a bundle). If the tests are passing on Dataflow
this might mean that we need different tests (or it might
mean that Dataflow is "working" for some mysterious reason
that is not obvious from the code :) ).

On Wed, Oct 23, 2019 at 2:54 AM Jan Lukavský
mailto:je...@seznam.cz>> wrote:

Hi,

as part of [1] a new set of validatesRunner tests has
been introduced.
These tests (currently marked as category
UsesStrictTimerOrdering)
verify that runners fire timers in increasing timestamp
under all
circumstances. After adding these validatesRunner tests,
Samza [2] and
Portable Flink [3] started to fail these tests. I have
created the
tracking issues for that, because that behavior should
be fixed (timers
in wrong order can cause erratic behavior and/or data loss).

I'm writing to anyone interested in solving these issues.

Cheers,

  Jan

[1] https://issues.apache.org/jira/browse/BEAM-7520

[2] https://issues.apache.org/jira/browse/BEAM-8459

[3] https://issues.apache.org/jira/browse/BEAM-8460



Re: RabbitMqIO issues and open PRs

2019-11-01 Thread Jean-Baptiste Onofré
Hi,

I just provided feedback in the PRs.

Let me know if you want to chat about some initial implementation (as
I'm the original author of the IO, I remember some discussion in the
past ;) ).

Regards
JB

On 31/10/2019 21:38, Daniel Robert wrote:
> I'm pretty new to the Beam ecosystem, so apologies if this is not the
> right forum for this.
> 
> My team has been learning and starting to use Beam for the past few
> months and have run into myriad problems with the RabbitIO connector for
> java, aspects of which seem perhaps fundamentally broken or incorrect in
> the released implementation. I've tracked our significant issues down
> and opened tickets and PRs for them. I'm not certain what the typical
> response time is, but given the severity of the issues (as I perceive
> them), I'd like to escalate some of these PRs and try to get some fixes
> into the next Beam release.
> 
> I originally opened BEAM-8390 (https://github.com/apache/beam/pull/9782)
> as it was impossible to set the 'useCorrelationId' property (implying
> this functionality was also untested). Since then, I've found and PR'd
> the following, which are awaiting feedback/approval:
> 
> 1. Watermarks not advancing
> 
> Ticket/PR: BEAM-8347 - https://github.com/apache/beam/pull/9820
> 
> Impact: under low message volumes, the watermark never advances and
> windows can never 'on time' fire.
> 
> Notes: The RabbitMq UnboundedSource uses 'oldest known time' as a
> watermark when other similar sources (and documentation on watermarking)
> state for monotonically increasing timestamps (the case with a queue) it
> should be the most recent time. I have a few open questions about
> testing and implementation details in the PR but it should work as-is.
> 
> 2. Exchanges are always declared, which fail if a pre-existing exchange
> differs
> 
> Ticket/PR: BEAM-8513 - https://github.com/apache/beam/pull/9937
> 
> Impact: It is impossible to utilize an existing, durable exchange.
> 
> Notes: I'm hooking Beam up to an existing topic exchange (an 'event
> bus') that is 'durable'. RabbitMqIO current implementation will always
> attempt to declare the exchange, and does so as non-durable, which
> causes rabbit to fail the declaration. (Interestingly qpid does not fail
> in this scenario.) The PR allows the caller to disable declaring the
> exchange, similar to `withQueueDeclare` for declaring a queue.
> 
> This PR also calls out a lot of the documentation that seems misleading;
> implying that one either interacts with queues *or* exchanges when that
> is not how AMQP fundamentally operates. The implementation of the
> RabbitMqIO connector before this PR seems like it probably works with
> the default exchange and maybe a fanout exchange, but not a topic exchange.
> 
> 3. Library versions
> 
> Tickets/PR: BEAM-7434, BEAM-5895, and BEAM-5894 -
> https://github.com/apache/beam/pull/9900
> 
> Impact: The rabbitmq amqp client for java released the 5.x line in
> September of 2017. Some automated tickets were open to upgrade, plus a
> manual ticket to drop the use of the deprecated QueueConsumer API.
> 
> Notes: The upgrade was relatively simple, but I implemented it using a
> pull-based API rather than push-based (Consumer) which may warrant some
> discussion. I'm used to discussing this type of thing over PRs but I'm
> happy to do whatever the community prefers.
> 
> ---
> 
> Numbers 1 and 2 above are 'dealbreaker' issues for my team. They
> effectively make rabbitmq unusable as an unbounded source, forcing
> developers to fork and modify the code. Number 3 is much less
> significant and I've put it here more for 'good, clean living' than an
> urgent issue.
> 
> Aside from the open issues, given the low response rate so far, I'd be
> more than happy to take on a more active role in maintaining/reviewing
> the rabbitmq io for java. For now, however, is this list the best way to
> 'bump' these open issues and move forward? Further, is the general
> approach before opening a PR to ask some preliminary questions in this
> email list?
> 
> Thank you,
> -Daniel Robert
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com