Re: RabbitMQ and CheckpointMark feasibility

2020-01-06 Thread Daniel Robert

Alright, a bit late but this took me a while.

Thanks for all the input so far. I have rewritten much of the RabbitMq 
IO connector and have it ready to go in a draft pr: 
https://github.com/apache/beam/pull/10509


This should incorporate a lot of what's been discussed here, in terms of 
watermarking, serialization, error handling, etc. It also 
clarifies/cleans up a lot of very confusing documentation/api settings 
pertaining to using 'queues vs exchanges' and adds clarifying 
documentation on various valid AMQP paradigms.


Watermarking/timestamp management is mostly stolen from KafkaIO and 
modified as appropriate.


This also does a lot to improve resource management in terms of 
Connection and Channel usage, largely modeled after JdbcIO's 
ConnectionHandlerProvider concept.


I'm not entirely sure how best to proceed from here, hence the email. 
It's a huge PR, but it has no specific backing ticket (it should), and 
historically there haven't been many eyes on RabbitMq PRs.


Thanks,
-Danny

On 11/14/19 4:13 PM, Jan Lukavský wrote:


On 11/14/19 9:50 PM, Daniel Robert wrote:

Alright, thanks everybody. I'm really appreciative of the 
conversation here. I think I see where my disconnect is and how this 
might all work together for me. There are some bugs in the current 
rabbit implementation that I think have confused my understanding of 
the intended semantics. I'm coming around to seeing how such a system 
with rabbit's restrictions can work properly in Beam (I'd totally 
forgotten about 'dedupe' support in Beam) but I want to clarify some 
implementation questions after pulling everyone's notes together.


RabbitMQ reader should not bother accepting an existing 
CheckpointMark in its constructor (in 'ack-based' systems this is 
unnecessary per Eugene's original reply). It should construct its own 
CheckpointMark at construction time and use it throughout its lifecycle.


At some point later, the CheckpointMark will be 'finalized'. If this 
CheckpointMark has been Serialized (via Coder or otherwise) or its 
underlying connection has been severed, this step will fail. This 
would mean at some point the messages are redelivered to Beam on some 
other Reader, so no data loss. If it has not been serialized, the 
acks will take place just fine, even if much later.


If the system is using processing-time as event-time, however, the 
redelivery of these messages would effectively change the ordering 
and potentially the window they arrived in. I *believe* that Beam 
deduping seems to be managed per-window so if 'finalizeCheckpoint' is 
attempted (and fails) would these messages appear in a new window?


This is very much likely to happen with any source, if it would assign 
something like *now* to event time. That is ill defined and if the 
source cannot provide some retry-persistent estimate of real 
event-time, than I'd suggest to force user to specify an UDF to 
extract event time from the payload. Everything else would probably 
break (at least if any timestamp-related windowing would be used in 
the pipeline).


Perhaps my question are now:
- how should a CheckpointMark should communicate failure to the Beam

An exception thrown should fail the checkpoint and therefore retry 
everything from the last checkpoint.


- how does Beam handle a CheckpointMark.finalizeCheckpoint failure, 
if the API dictates such a thing?



See above.


- is there a provision that would need to be made for processing-time 
sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
nervous redelivered messages would appear in another window)


You are nervous for a reason. :) I strongly believe processing time 
source should be considered anti-pattern, at least in situations where 
there is any time manipulation downstream (time-windows, stateful 
processing, ...).


- What is the relationship lifecycle-wise between a CheckpointMark 
and a Reader? My understanding is a CheckpointMark may outlive a 
Reader, is that correct?


Definitely. But the same instance bound to the lifecycle of the reader 
would be used to finalizeCheckpoint (if that ever happens).


Thanks for bearing with me everyone. It feels a bit unfortunate my 
first foray into beam is reliant on this rabbit connector but I'm 
learning a lot and I'm very grateful for the help. PRs pending once I 
get this all straightened out in my head.


-Danny

On 11/14/19 2:35 PM, Eugene Kirpichov wrote:

Hi Daniel,


On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <mailto:daniel.rob...@acm.org>> wrote:


I believe I've nailed down a situation that happens in practice
that causes Beam and Rabbit to be incompatible. It seems that
runners can and do make assumptions about the serializability
(via Coder) of a CheckpointMark.

To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the cl

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Daniel Robert
Alright, thanks everybody. I'm really appreciative of the conversation 
here. I think I see where my disconnect is and how this might all work 
together for me. There are some bugs in the current rabbit 
implementation that I think have confused my understanding of the 
intended semantics. I'm coming around to seeing how such a system with 
rabbit's restrictions can work properly in Beam (I'd totally forgotten 
about 'dedupe' support in Beam) but I want to clarify some 
implementation questions after pulling everyone's notes together.


RabbitMQ reader should not bother accepting an existing CheckpointMark 
in its constructor (in 'ack-based' systems this is unnecessary per 
Eugene's original reply). It should construct its own CheckpointMark at 
construction time and use it throughout its lifecycle.


At some point later, the CheckpointMark will be 'finalized'. If this 
CheckpointMark has been Serialized (via Coder or otherwise) or its 
underlying connection has been severed, this step will fail. This would 
mean at some point the messages are redelivered to Beam on some other 
Reader, so no data loss. If it has not been serialized, the acks will 
take place just fine, even if much later.


If the system is using processing-time as event-time, however, the 
redelivery of these messages would effectively change the ordering and 
potentially the window they arrived in. I *believe* that Beam deduping 
seems to be managed per-window so if 'finalizeCheckpoint' is attempted 
(and fails) would these messages appear in a new window?


Perhaps my question are now:
- how should a CheckpointMark should communicate failure to the Beam
- how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if 
the API dictates such a thing?
- is there a provision that would need to be made for processing-time 
sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
nervous redelivered messages would appear in another window)
- What is the relationship lifecycle-wise between a CheckpointMark and a 
Reader? My understanding is a CheckpointMark may outlive a Reader, is 
that correct?


Thanks for bearing with me everyone. It feels a bit unfortunate my first 
foray into beam is reliant on this rabbit connector but I'm learning a 
lot and I'm very grateful for the help. PRs pending once I get this all 
straightened out in my head.


-Danny

On 11/14/19 2:35 PM, Eugene Kirpichov wrote:

Hi Daniel,


On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert <mailto:daniel.rob...@acm.org>> wrote:


I believe I've nailed down a situation that happens in practice
that causes Beam and Rabbit to be incompatible. It seems that
runners can and do make assumptions about the serializability (via
Coder) of a CheckpointMark.

To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the
server along this channel
- when messages are done processing, they are acknowledged
*client-side* and must be acknowledged on the *same channel* that
originally received the message.

Since a channel (or any open connection) is non-serializable, it
means that a CheckpointMark that has been serialized cannot ever
be used to acknowledge these messages and correctly 'finalize' the
checkpoint. It also, as previously discussed in this thread,
implies a rabbit Reader cannot accept an existing CheckpointMark
at all; the Reader and the CheckpointMark must share the same
connection to the rabbit server ("channel").

This is correct.

Next, I've found how DirectRunner (and presumably others) can
attempt to serialize a CheckpointMark that has not been finalized.
In

https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it hits, it sets the
current reader to 'null' but retains the existing CheckpointMark,
which it then attempts to pass to a new reader via a Coder.

Correct, this simulates a failure scenario:
- Runner was reading the source and, after finalizing a bunch of 
previous CheckpointMarks, obtained a new one and serialized it so 
things can be restored in case of failure
- A failure happened before the current CheckpointMark could be 
finalized, which means Beam was not able to guarantee that elements 
after the last-finalized mark have been durably processed, so we may 
need to re-read them, so runner recreates a reader from the current mark.


This puts the shard, the runner, and the reader with differing
views of the world. In UnboundedReadEvaluatorFactory's
processElement function, a call to getReader(shard) (

https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluat

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Daniel Robert
We may be talking past each other a bit, though I do appreciate the 
responses.


Rabbit behaves a lot like a relational database in terms of state 
required. A connection is analogous to a database connection, and a 
channel (poor analogy here) is similar to an open transaction. If the 
connection is severed, the transaction will not be able to be committed.


In direct response to the consumer lifecycle linked to, yes, one can 
recover and re-establish connections, but any state maintained within 
the previous channel are lost. If there were messages that had not been 
acknowledged, they would have been re-delivered to some other consumer 
as they were never acknowledged.


"Subscription" isn't really the model in rabbit. It has advantages and 
disadvantages when compared with kafka -- mostly out of scope here -- 
but some quick advantages of the rabbit model: 1) it parallelizes 
"infinitely" without any changes to server (no re-partitioning or the 
like); 2) messages can be acknowledge in a separate order than they were 
consumed; 3) because state is managed associated with an active 
connection, at-least-once delivery semantics are easy to implement as 
any disconnection will result in the messages being re-placed in the 
queue and delivered to a new consumer. To say it's "incompatible with 
any fault tolerant semantics" is unfair, they just aren't incompatible 
with Beam's, as Beam is currently implemented.


Regardless, I'm now wondering what the best path forward is. Rabbit 
isn't unusable in Beam if the set of requirements and tradeoffs are well 
documented. That is, there are use cases that could be properly 
supported and some that likely can't.


One option would be to use a pull-based api and immediately acknowledge 
each message as they arrive. This would effectively make the 
CheckpointMark a no-op, other than maintaining the watermark. In a 
pipeline that uses fixed windows (or non-session windowing) and uses a 
runner that supports 'Drain'-style semantics (like Dataflow) this should 
work just fine I think.


Another would be to do a best-attempt at acknowledging as late as 
possible. This would be a hybrid approach where we attempt 
acknowledgements in the CheckpointMark, but use a special Coder that 
acknowledges all messages at the point the CheckpointMark is encoded. I 
think this feels a bit unsafe and overly complex, and I'm not sure it 
solves any real-world problems.


I also feel like perhaps we should include Beam IO documentation that 
makes it clear that an unbounded source that requires a persistent 
connection for state tracking is not supportable by beam.


Thanks,
-Danny

On 11/14/19 7:49 AM, Jan Lukavský wrote:

Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
Generally, if RabbitMQ would have no way to recover subscription 
(which I don't think is the case), then it would not be incompatible 
with beam, but actually with would be incompatible any fault tolerant 
semantics.


[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle

Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert 
:



On 11/14/19 2:32 AM, Jan Lukavský wrote:

Hi Danny,

as Eugene pointed out, there are essentially two "modes of
operation" of CheckpointMark. It can:

 a) be used to somehow restore state of a reader (in call to
UnboundedSource#createReader)

 b) confirm processed elements in
CheckpointMark#finalizeCheckpoint

If your source doesn't provide a persistent position in data
stream that can be referred to (and serialized - example of
this would be kafka offsets), then what you actually need to
serialize is not the channel, but a way how to restore it -
e.g. by opening a new channel with a given 'consumer group
name'. Then you just use this checkpoint to commit your
processed data in finalizeCheckpoint.

Note that the finalizeCheckpoint is not guaranteed to be
called - that can happen in cases when an error occurs and the
source has to be rewind back - that is what direct runner
emulates with the probability of 'readerReuseChance'.

I'm reading the documentation of RabbitMQ very quickly, but if
I understand it correctly, then you have to create a
subscription to the broker, serialize identifier of the
subscription into the checkpointmark and then just recover the
subscription in call to UnboundedSource#createReader. That
should do the trick.

I have not seen any such documentation in rabbit. My understanding
is it has to be the same, physical connection and channel. Can you
cite the source you were looking at?

-Danny

Hope this helps, sorry if I'm not using 100% correct RabbitMQ
terminology as I said, I'm not quite familiar with it.

Best,

     Jan

    On 11/14/19 5:26 AM, Daniel R

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Daniel Robert


On 11/14/19 2:32 AM, Jan Lukavský wrote:


Hi Danny,

as Eugene pointed out, there are essentially two "modes of operation" 
of CheckpointMark. It can:


 a) be used to somehow restore state of a reader (in call to 
UnboundedSource#createReader)


 b) confirm processed elements in CheckpointMark#finalizeCheckpoint

If your source doesn't provide a persistent position in data stream 
that can be referred to (and serialized - example of this would be 
kafka offsets), then what you actually need to serialize is not the 
channel, but a way how to restore it - e.g. by opening a new channel 
with a given 'consumer group name'. Then you just use this checkpoint 
to commit your processed data in finalizeCheckpoint.


Note that the finalizeCheckpoint is not guaranteed to be called - that 
can happen in cases when an error occurs and the source has to be 
rewind back - that is what direct runner emulates with the probability 
of 'readerReuseChance'.


I'm reading the documentation of RabbitMQ very quickly, but if I 
understand it correctly, then you have to create a subscription to the 
broker, serialize identifier of the subscription into the 
checkpointmark and then just recover the subscription in call to 
UnboundedSource#createReader. That should do the trick.


I have not seen any such documentation in rabbit. My understanding is it 
has to be the same, physical connection and channel. Can you cite the 
source you were looking at?


-Danny

Hope this helps, sorry if I'm not using 100% correct RabbitMQ 
terminology as I said, I'm not quite familiar with it.


Best,

 Jan

On 11/14/19 5:26 AM, Daniel Robert wrote:


I believe I've nailed down a situation that happens in practice that 
causes Beam and Rabbit to be incompatible. It seems that runners can 
and do make assumptions about the serializability (via Coder) of a 
CheckpointMark.


To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the server 
along this channel
- when messages are done processing, they are acknowledged 
*client-side* and must be acknowledged on the *same channel* that 
originally received the message.


Since a channel (or any open connection) is non-serializable, it 
means that a CheckpointMark that has been serialized cannot ever be 
used to acknowledge these messages and correctly 'finalize' the 
checkpoint. It also, as previously discussed in this thread, implies 
a rabbit Reader cannot accept an existing CheckpointMark at all; the 
Reader and the CheckpointMark must share the same connection to the 
rabbit server ("channel").


Next, I've found how DirectRunner (and presumably others) can attempt 
to serialize a CheckpointMark that has not been finalized. In 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, 
the DirectRunner applies a probability and if it hits, it sets the 
current reader to 'null' but retains the existing CheckpointMark, 
which it then attempts to pass to a new reader via a Coder.


This puts the shard, the runner, and the reader with differing views 
of the world. In UnboundedReadEvaluatorFactory's processElement 
function, a call to getReader(shard) ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 
) clones the shard's checkpoint mark and passes that to the new 
reader. The reader ignores it, creating its own, but even if it 
accepted it, it would be accepting a serialized CheckpointMark, which 
wouldn't work. Later, the runner calls finishRead ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 
). The shard's CheckpointMark (unserialized; which should still be 
valid) is finalized. The reader's CheckpointMark (which may be a 
different instance) becomes the return value, which is referred to as 
"finishedCheckpoint" in the calling code, which is misleading at best 
and problematic at worst as *this* checkpoint has not been finalized.


So, tl;dr: I cannot find any means of maintaining a persistent 
connection to the server for finalizing checkpoints that is safe 
across runners. If there's a guarantee all of the shards are on the 
same JVM instance, I could rely on global, static 
collections/instances as a workaround, but if other runners might 
serialize this across the wire, I'm stumped. The only workable 
situation I can think of right now is to proactively acknowledge 
messages as they are received and effectively no-op in 
finalizeCheckpoint. This is very different, semantically, and can 
lead to dropped messages if a pipeline doesn't finish processing the 
given message.


Any help would be much appreciated.

Thanks,
-Dann

Re: RabbitMQ and CheckpointMark feasibility

2019-11-13 Thread Daniel Robert
s. So you can simply ignore the non-serializability.


On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert <mailto:daniel.rob...@acm.org>> wrote:


(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x
library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous problems so
put up a correction PR that I think needs some eyes fairly quickly as
I'd consider master to be broken for rabbitmq right now. The PR keeps
the upgrade but reverts to the same push-based implementation as
in 4.x:
https://github.com/apache/beam/pull/9977 )

Regardless, in trying to get the pull-based API to work, I'm
finding the
interactions between rabbitmq and beam with CheckpointMark to be
fundamentally impossible to implement so I'm hoping for some input
here.

CheckointMark itself must be Serializable, presumably this means
it gets
shuffled around between nodes. However 'Channel', the tunnel through
which it communicates with Rabbit to ack messages and finalize the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new CheckpointMark is
instantiated, it's given a Channel. If an existing one is supplied to
the Reader's constructor (part of the 'startReader()' interface), the
channel is overwritten.

*However*, Rabbit does not support 'ack'ing messages on a channel
other
than the one that consumed them in the first place. Attempting to
do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed

).

Truthfully, I don't really understand how the current
implementation is
working; it seems like a happy accident. But I'm curious if someone
could help me debug and implement how to bridge the
re-usable/serializable CheckpointMark requirement in Beam with this
limitation of Rabbit.

    Thanks,
-Daniel Robert



Re: RabbitMQ and CheckpointMark feasibility

2019-11-08 Thread Daniel Robert

Thanks Euguene and Reuven.

In response to Eugene, I'd like to confirm I have this correct: In the 
rabbit-style use case of "stream-system-side checkpointing", it is safe 
(and arguably the correct behavior) to ignore the supplied 
CheckpointMark argument in `createReader(options, checkpointmark)` and 
in the constructor for the and instead always instantiate a new 
CheckpointMark during construction. Is that correct?


In response to Reuven: noted, however I was mostly using serialization 
in the general sense. That is, there does not seem to be any means of 
deserializing a RabbitMqCheckpointMark such that it can continue to 
provide value to a runner. Whether it's java serialization, avro, or any 
other Coder, the 'channel' itself cannot "come along for the ride", 
which leaves the rest of the internal state mostly unusable except for 
perhaps some historical, immutable use case.


-Danny

On 11/8/19 2:01 AM, Reuven Lax wrote:
Just to clarify one thing: CheckpointMark does not need to be Java 
Seralizable. All that's needed is do return a Coder for the 
CheckpointMark in getCheckpointMarkCoder.


On Thu, Nov 7, 2019 at 7:29 PM Eugene Kirpichov <mailto:j...@google.com>> wrote:


Hi Daniel,

This is probably insufficiently well documented. The
CheckpointMark is used for two purposes:
1) To persistently store some notion of how much of the stream has
been consumed, so that if something fails we can tell the
underlying streaming system where to start reading when we
re-create the reader. This is why CheckpointMark is Serializable.
E.g. this makes sense for Kafka.
2) To do acks - to let the underlying streaming system know that
the Beam pipeline will never need data up to this CheckpointMark.
Acking does not require serializability - runners call ack() on
the same in-memory instance of CheckpointMark that was produced by
the reader. E.g. this makes sense for RabbitMq or Pubsub.

In practice, these two capabilities tend to be mutually exclusive:
some streaming systems can provide a serializable CheckpointMark,
some can do acks, some can do neither - but very few (or none) can
do both, and it's debatable whether it even makes sense for a
system to provide both capabilities: usually acking is an implicit
form of streaming-system-side checkpointing, i.e. when you
re-create the reader you don't actually need to carry over any
information from an old CheckpointMark - the necessary state
(which records should be delivered) is maintained on the streaming
system side.

These two are lumped together into one API simply because that was
the best design option we came up with (not for lack of trying,
but suggestions very much welcome - AFAIK nobody is happy with it).

RabbitMQ is under #2 - it can't do serializable checkpoint marks,
but it can do acks. So you can simply ignore the non-serializability.

On Thu, Nov 7, 2019 at 12:07 PM Daniel Robert
mailto:daniel.rob...@acm.org>> wrote:

(Background: I recently upgraded RabbitMqIO from the 4.x to
5.x library.
As part of this I switched to a pull-based API rather than the
previously-used push-based. This has caused some nebulous
problems so
put up a correction PR that I think needs some eyes fairly
quickly as
I'd consider master to be broken for rabbitmq right now. The
PR keeps
the upgrade but reverts to the same push-based implementation
as in 4.x:
https://github.com/apache/beam/pull/9977 )

Regardless, in trying to get the pull-based API to work, I'm
finding the
interactions between rabbitmq and beam with CheckpointMark to be
fundamentally impossible to implement so I'm hoping for some
input here.

CheckointMark itself must be Serializable, presumably this
means it gets
shuffled around between nodes. However 'Channel', the tunnel
through
which it communicates with Rabbit to ack messages and finalize
the
checkpoint, is non-Serializable. Like most other CheckpointMark
implementations, Channel is 'transient'. When a new
CheckpointMark is
instantiated, it's given a Channel. If an existing one is
supplied to
the Reader's constructor (part of the 'startReader()'
interface), the
channel is overwritten.

*However*, Rabbit does not support 'ack'ing messages on a
channel other
than the one that consumed them in the first place. Attempting
to do so
results in a '406 (PRECONDITION-FAILED) - unknown delivery
tag'. (See

https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed

).

Truthfully, I don't really understand how the current
implementation is
working; it seems lik

RabbitMQ and CheckpointMark feasibility

2019-11-07 Thread Daniel Robert
(Background: I recently upgraded RabbitMqIO from the 4.x to 5.x library. 
As part of this I switched to a pull-based API rather than the 
previously-used push-based. This has caused some nebulous problems so 
put up a correction PR that I think needs some eyes fairly quickly as 
I'd consider master to be broken for rabbitmq right now. The PR keeps 
the upgrade but reverts to the same push-based implementation as in 4.x: 
https://github.com/apache/beam/pull/9977 )


Regardless, in trying to get the pull-based API to work, I'm finding the 
interactions between rabbitmq and beam with CheckpointMark to be 
fundamentally impossible to implement so I'm hoping for some input here.


CheckointMark itself must be Serializable, presumably this means it gets 
shuffled around between nodes. However 'Channel', the tunnel through 
which it communicates with Rabbit to ack messages and finalize the 
checkpoint, is non-Serializable. Like most other CheckpointMark 
implementations, Channel is 'transient'. When a new CheckpointMark is 
instantiated, it's given a Channel. If an existing one is supplied to 
the Reader's constructor (part of the 'startReader()' interface), the 
channel is overwritten.


*However*, Rabbit does not support 'ack'ing messages on a channel other 
than the one that consumed them in the first place. Attempting to do so 
results in a '406 (PRECONDITION-FAILED) - unknown delivery tag'. (See 
https://www.grzegorowski.com/rabbitmq-406-channel-closed-precondition-failed 
).


Truthfully, I don't really understand how the current implementation is 
working; it seems like a happy accident. But I'm curious if someone 
could help me debug and implement how to bridge the 
re-usable/serializable CheckpointMark requirement in Beam with this 
limitation of Rabbit.


Thanks,
-Daniel Robert



RabbitMqIO issues and open PRs

2019-10-31 Thread Daniel Robert
I'm pretty new to the Beam ecosystem, so apologies if this is not the 
right forum for this.


My team has been learning and starting to use Beam for the past few 
months and have run into myriad problems with the RabbitIO connector for 
java, aspects of which seem perhaps fundamentally broken or incorrect in 
the released implementation. I've tracked our significant issues down 
and opened tickets and PRs for them. I'm not certain what the typical 
response time is, but given the severity of the issues (as I perceive 
them), I'd like to escalate some of these PRs and try to get some fixes 
into the next Beam release.


I originally opened BEAM-8390 (https://github.com/apache/beam/pull/9782) 
as it was impossible to set the 'useCorrelationId' property (implying 
this functionality was also untested). Since then, I've found and PR'd 
the following, which are awaiting feedback/approval:


1. Watermarks not advancing

Ticket/PR: BEAM-8347 - https://github.com/apache/beam/pull/9820

Impact: under low message volumes, the watermark never advances and 
windows can never 'on time' fire.


Notes: The RabbitMq UnboundedSource uses 'oldest known time' as a 
watermark when other similar sources (and documentation on watermarking) 
state for monotonically increasing timestamps (the case with a queue) it 
should be the most recent time. I have a few open questions about 
testing and implementation details in the PR but it should work as-is.


2. Exchanges are always declared, which fail if a pre-existing exchange 
differs


Ticket/PR: BEAM-8513 - https://github.com/apache/beam/pull/9937

Impact: It is impossible to utilize an existing, durable exchange.

Notes: I'm hooking Beam up to an existing topic exchange (an 'event 
bus') that is 'durable'. RabbitMqIO current implementation will always 
attempt to declare the exchange, and does so as non-durable, which 
causes rabbit to fail the declaration. (Interestingly qpid does not fail 
in this scenario.) The PR allows the caller to disable declaring the 
exchange, similar to `withQueueDeclare` for declaring a queue.


This PR also calls out a lot of the documentation that seems misleading; 
implying that one either interacts with queues *or* exchanges when that 
is not how AMQP fundamentally operates. The implementation of the 
RabbitMqIO connector before this PR seems like it probably works with 
the default exchange and maybe a fanout exchange, but not a topic exchange.


3. Library versions

Tickets/PR: BEAM-7434, BEAM-5895, and BEAM-5894 - 
https://github.com/apache/beam/pull/9900


Impact: The rabbitmq amqp client for java released the 5.x line in 
September of 2017. Some automated tickets were open to upgrade, plus a 
manual ticket to drop the use of the deprecated QueueConsumer API.


Notes: The upgrade was relatively simple, but I implemented it using a 
pull-based API rather than push-based (Consumer) which may warrant some 
discussion. I'm used to discussing this type of thing over PRs but I'm 
happy to do whatever the community prefers.


---

Numbers 1 and 2 above are 'dealbreaker' issues for my team. They 
effectively make rabbitmq unusable as an unbounded source, forcing 
developers to fork and modify the code. Number 3 is much less 
significant and I've put it here more for 'good, clean living' than an 
urgent issue.


Aside from the open issues, given the low response rate so far, I'd be 
more than happy to take on a more active role in maintaining/reviewing 
the rabbitmq io for java. For now, however, is this list the best way to 
'bump' these open issues and move forward? Further, is the general 
approach before opening a PR to ask some preliminary questions in this 
email list?


Thank you,
-Daniel Robert