Re: a fix to send RabbitMq messages

2019-05-24 Thread Kenneth Knowles
Coders are all set up by the SDK before the pipeline is given to a runner,
so that sounds like a strange issue. Would you also file a Jira ticket
about your experience with the coder registry and the DataflowRunner?

On Fri, May 24, 2019 at 5:26 AM Nicolas Delsaux 
wrote:

> Thanks, PR is started (https://github.com/apache/beam/pull/8677), and
> I've set both Alexey and you as potential reviewers.
>
> Le 24/05/2019 à 13:55, Jean-Baptiste Onofré a écrit :
> > Hi,
> >
> > You can create a PullRequest, I will do the review.
> >
> > The coder is set on the RabbitMQIO PTransform, so, it should work.
> >
> > AFAIR, we have a Jira about that and I already started to check. Not yet
> > completed yet.
> >
> > Regards
> > JB
> >
> > On 24/05/2019 11:01, Nicolas Delsaux wrote:
> >> Hi all
> >>
> >> I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
> >> to kafka with some transform in between.
> >>
> >> Doing, so, i've discovered some differences between direct runner
> >> behaviour and Google Dataflow runner.
> >>
> >> But first, a small introduction to what I know.
> >>
> >>  From what I understand, elements transmitted between two different
> >> transforms are serialized/deserialized.
> >>
> >> This (de)serialization process is normally managed by Coder, in which
> >> the most used is obviously the Serializablecoder, which takes a
> >> serializable object and (de)serialize it using classical java
> mechanisms.
> >>
> >> On direct runner, i had issues with rabbitMq messages, as they contain
> >> in their headers objects that are LongString, an interface implemented
> >> solely in a private static class of RabbitMq, and used for large text
> >> messages.
> >>
> >> So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
> >> (using
> >> pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
> >> new MyCoder())
> >>
> >> And it worked ! well, not in dataflow runner.
> >>
> >>
> >> indeed, it seems like dataflow runner don't use this coder registry
> >> mechanism (for reasons I absolutely don't understand).
> >>
> >> So my fix didn't work.
> >>
> >> After various tries, I finally gave up and directly modified the
> >> RabbitMqIO class (from Apache Beam) to handle my case.
> >>
> >> This fix is available on my Beam fork on GitHub, and i would like to
> >> have it integrated.
> >>
> >> What is the procedure to do so ?
> >>
> >> Thanks !
> >>
>


Re: How to setup portable flink runner with remote flink cluster

2019-05-24 Thread Kyle Weaver
Have you tried running without setting the environmentType flag?

On Fri, May 24, 2019 at 8:24 AM 青雉(祁明良)  wrote:

> Hi all,
>
> This is Mingliang, I followed the document to setup beam pipeline on local
> Flink cluster, but when I switch to remote Flink cluster, it doesn’t work
> directly and I can’t find  many materials talking about this.
>
> Firstly, I don’t know if I have to install anything on the Flink
> jobmanager / taskmanager machine, actually I just setup the Flink cluster
> itself, let’s say Jobmanager on machine A and two taskmanager on machine
> B,C.
>
> Next I installed apache_beam python package on machine D, and started the
> beam jobservice end point on machine D with: ./gradlew
> beam-runners-flink_2.11-job-server:runShadow -PflinkMasterUrl=A:8081
>
> Then I submit the word_count example downloaded from beam to the
> jobservice end point on machine D with environment_type = LOOPBACK
>
> The error message I received was attached below.
>
> Any helps will be appreciated,
> Thanks,
> Mingliang
>
> - Versions
> Beam: release-2.12.0
> Flink: 1.5.6
>
> - Log
> 
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception:
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException:
> UNAVAILABLE: io exception
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
> ... 1 more
> Caused by:
> org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException:
> UNAVAILABLE: io exception
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4992)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:186)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
> at
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:143)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
> ... 3 more
> Caused by:
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException:
> UNAVAILABLE: io exception
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:222)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:203)
> at
> org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:132)
> at
> org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.notifyRunnerAvailable(BeamFnExternalWorkerPoolGrpc.java:152)
> at
> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:109)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:161)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
> at
> org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
> ... 13 more
> Caused by:
> org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.AbstractChannel$AnnotatedConnectException:
> Connection refused: localhost/127.0.0.1:45955
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at
> 

How to setup portable flink runner with remote flink cluster

2019-05-24 Thread 青雉(祁明良)
Hi all,

This is Mingliang, I followed the document to setup beam pipeline on local 
Flink cluster, but when I switch to remote Flink cluster, it doesn’t work 
directly and I can’t find  many materials talking about this.

Firstly, I don’t know if I have to install anything on the Flink jobmanager / 
taskmanager machine, actually I just setup the Flink cluster itself, let’s say 
Jobmanager on machine A and two taskmanager on machine B,C.

Next I installed apache_beam python package on machine D, and started the beam 
jobservice end point on machine D with: ./gradlew 
beam-runners-flink_2.11-job-server:runShadow -PflinkMasterUrl=A:8081

Then I submit the word_count example downloaded from beam to the jobservice end 
point on machine D with environment_type = LOOPBACK

The error message I received was attached below.

Any helps will be appreciated,
Thanks,
Mingliang

- Versions
Beam: release-2.12.0
Flink: 1.5.6

- Log

Caused by: java.lang.Exception: The user defined 'open()' method caused an 
exception: org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
... 1 more
Caused by: 
org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2214)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.get(LocalCache.java:4053)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4057)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4986)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4992)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:186)
at 
org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
at 
org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
at 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:143)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
... 3 more
Caused by: org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException: 
UNAVAILABLE: io exception
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:222)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:203)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:132)
at 
org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc$BeamFnExternalWorkerPoolBlockingStub.notifyRunnerAvailable(BeamFnExternalWorkerPoolGrpc.java:152)
at 
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory.createEnvironment(ExternalEnvironmentFactory.java:109)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
at 
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:161)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3628)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2336)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2295)
at 
org.apache.beam.vendor.guava.v20_0.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2208)
... 13 more
Caused by: 
org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.AbstractChannel$AnnotatedConnectException:
 Connection refused: localhost/127.0.0.1:45955
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325)
at 
org.apache.beam.vendor.grpc.v1p13p1.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
at 

Performance of Wait.on (and side inputs in general) in dataflow

2019-05-24 Thread Steve Niemitz
Hi everyone.

I've been debugging a streaming job (on dataflow) for a little while now,
and seem to have tracked it down to the Wait.on transform that I'm using.

Some background: our pipeline takes in ~250,000 messages/sec from pubsub,
aggregates them in an hourly window, and then emits the results.  The final
output from the combiner is ~20,000,000 keys (emitted at the end of the
hour).

These results are tee-d, they get written somewhere, and they keys are also
sent into a Wait.on transform, where they wait for a signal, before being
written to pubsub.

If I let this pipeline run for a couple of hours, the input processing rate
eventually drops down below the rate of messages going into the queue, and
I'll get a bunch of deadline_exceeded errors from windmill for one specific
worker.  At this point the pipeline is basically unrecoverable and needs to
be restarted.

If I remove the wait transform, everything works great.

My back-of-the-envelope calculations are that the elements going into the
wait transform are ~1 GB total, so its not a huge input either.  My guess
is there's some kind of O(n^2) operation happening here, because this same
pipeline does work fairly reliably with a lower key space (~100,000-1
million).

The other interesting thing I've noticed is that the stage is constantly
processing operations even with no messages coming into it. (eg, in my
lower scale case, in the windmill status page, the stage has ~100,000
active operations / second "succeeding", but no messages are going into the
stage (since its not the end of the hour)).  It's also written 10 GB of
data, although the dataflow UI says that only 500 MB of data has gone into
the wait transform.

It seems like there might just be a bug here, but in the interim, is there
any way to construct something similar to the Wait transform but without
using side-inputs?  My first guess was possibly a CoGroupByKey?

Thanks!


Re: a fix to send RabbitMq messages

2019-05-24 Thread Nicolas Delsaux

Thanks, PR is started (https://github.com/apache/beam/pull/8677), and
I've set both Alexey and you as potential reviewers.

Le 24/05/2019 à 13:55, Jean-Baptiste Onofré a écrit :

Hi,

You can create a PullRequest, I will do the review.

The coder is set on the RabbitMQIO PTransform, so, it should work.

AFAIR, we have a Jira about that and I already started to check. Not yet
completed yet.

Regards
JB

On 24/05/2019 11:01, Nicolas Delsaux wrote:

Hi all

I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
to kafka with some transform in between.

Doing, so, i've discovered some differences between direct runner
behaviour and Google Dataflow runner.

But first, a small introduction to what I know.

 From what I understand, elements transmitted between two different
transforms are serialized/deserialized.

This (de)serialization process is normally managed by Coder, in which
the most used is obviously the Serializablecoder, which takes a
serializable object and (de)serialize it using classical java mechanisms.

On direct runner, i had issues with rabbitMq messages, as they contain
in their headers objects that are LongString, an interface implemented
solely in a private static class of RabbitMq, and used for large text
messages.

So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
(using
pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
new MyCoder())

And it worked ! well, not in dataflow runner.


indeed, it seems like dataflow runner don't use this coder registry
mechanism (for reasons I absolutely don't understand).

So my fix didn't work.

After various tries, I finally gave up and directly modified the
RabbitMqIO class (from Apache Beam) to handle my case.

This fix is available on my Beam fork on GitHub, and i would like to
have it integrated.

What is the procedure to do so ?

Thanks !



Re: a fix to send RabbitMq messages

2019-05-24 Thread Jean-Baptiste Onofré
Hi,

You can create a PullRequest, I will do the review.

The coder is set on the RabbitMQIO PTransform, so, it should work.

AFAIR, we have a Jira about that and I already started to check. Not yet
completed yet.

Regards
JB

On 24/05/2019 11:01, Nicolas Delsaux wrote:
> Hi all
> 
> I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
> to kafka with some transform in between.
> 
> Doing, so, i've discovered some differences between direct runner
> behaviour and Google Dataflow runner.
> 
> But first, a small introduction to what I know.
> 
> From what I understand, elements transmitted between two different
> transforms are serialized/deserialized.
> 
> This (de)serialization process is normally managed by Coder, in which
> the most used is obviously the Serializablecoder, which takes a
> serializable object and (de)serialize it using classical java mechanisms.
> 
> On direct runner, i had issues with rabbitMq messages, as they contain
> in their headers objects that are LongString, an interface implemented
> solely in a private static class of RabbitMq, and used for large text
> messages.
> 
> So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
> (using
> pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
> new MyCoder())
> 
> And it worked ! well, not in dataflow runner.
> 
> 
> indeed, it seems like dataflow runner don't use this coder registry
> mechanism (for reasons I absolutely don't understand).
> 
> So my fix didn't work.
> 
> After various tries, I finally gave up and directly modified the
> RabbitMqIO class (from Apache Beam) to handle my case.
> 
> This fix is available on my Beam fork on GitHub, and i would like to
> have it integrated.
> 
> What is the procedure to do so ?
> 
> Thanks !
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Beam Summit Europe: speakers and schedule online!

2019-05-24 Thread Aljoscha Krettek
You’re both right. The Kulturbrauerei Area is between Knaackstraße and 
Schönhauser Allee and there’s entrances on multiple sides. Schönhauser Alee is 
the more prominent street, though.

Btw, I live on Schönhauser Allee. :-)

> On 24. May 2019, at 13:48, Suneel Marthi  wrote:
> 
> Kulturbraurei is on Schönhauser Allee - u have the address wrong on the event 
> page. 
> 
> On Thu, May 23, 2019 at 4:58 PM Joana Filipa Bernardo Carrasqueira 
> mailto:joanafil...@google.com>> wrote:
> Hi all! 
> 
> Looking forward to the conversations about Beam and to meet new people in the 
> community! 
> 
> Please help us spreading the word about the Beam Summit within your networks 
> and register for the event here 
> . 
> 
> See you all soon!
> Joana
> 
> 
> On Thu, May 23, 2019 at 6:24 AM Matthias Baetens  > wrote:
> Hi everyone,
> 
> Happy to share that the speakers  and 
> schedule  are now online on the website.
> 
> Make sure you register on Eventbrite 
>  if you want to attend and follow 
> out Twitter channel  for announcements 
> regarding the speakers over the next few weeks!
> 
> Best regards,
> Matthias
> 
> 
> -- 
> Joana Carrasqueira
> Cloud Developer Relations Events Manager
> +1 415-602-2507
> 1160 N Mathilda Ave, Sunnyvale, CA 94089
> 
> 



Re: Beam Summit Europe: speakers and schedule online!

2019-05-24 Thread Suneel Marthi
Kulturbraurei is on Schönhauser Allee - u have the address wrong on the
event page.

On Thu, May 23, 2019 at 4:58 PM Joana Filipa Bernardo Carrasqueira <
joanafil...@google.com> wrote:

> Hi all!
>
> Looking forward to the conversations about Beam and to meet new people in
> the community!
>
> Please help us spreading the word about the Beam Summit within your
> networks and register for the event here
> 
> .
>
> See you all soon!
> Joana
>
>
> On Thu, May 23, 2019 at 6:24 AM Matthias Baetens <
> baetensmatth...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Happy to share that the speakers  and
>> schedule  are now online on the
>> website.
>>
>> Make sure you register on Eventbrite
>>  if you want to attend and
>> follow out Twitter channel  for
>> announcements regarding the speakers over the next few weeks!
>>
>> Best regards,
>> Matthias
>>
>
>
> --
>
> *Joana Carrasqueira*
>
> Cloud Developer Relations Events Manager
>
> +1 415-602-2507
>
> 1160 N Mathilda Ave, Sunnyvale, CA 94089
>
>
>


Re: a fix to send RabbitMq messages

2019-05-24 Thread Alexey Romanenko
Hi Nicolas,

Thank you for your contribution. I’d recommend you to start from this page:
https://beam.apache.org/contribute/#share-your-intent 


I’ll be glad to review your PR once it will be ready.

> On 24 May 2019, at 11:01, Nicolas Delsaux  wrote:
> 
> Hi all
> 
> I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
> to kafka with some transform in between.
> 
> Doing, so, i've discovered some differences between direct runner
> behaviour and Google Dataflow runner.
> 
> But first, a small introduction to what I know.
> 
> From what I understand, elements transmitted between two different
> transforms are serialized/deserialized.
> 
> This (de)serialization process is normally managed by Coder, in which
> the most used is obviously the Serializablecoder, which takes a
> serializable object and (de)serialize it using classical java mechanisms.
> 
> On direct runner, i had issues with rabbitMq messages, as they contain
> in their headers objects that are LongString, an interface implemented
> solely in a private static class of RabbitMq, and used for large text
> messages.
> 
> So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
> (using
> pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
> new MyCoder())
> 
> And it worked ! well, not in dataflow runner.
> 
> 
> indeed, it seems like dataflow runner don't use this coder registry
> mechanism (for reasons I absolutely don't understand).
> 
> So my fix didn't work.
> 
> After various tries, I finally gave up and directly modified the
> RabbitMqIO class (from Apache Beam) to handle my case.
> 
> This fix is available on my Beam fork on GitHub, and i would like to
> have it integrated.
> 
> What is the procedure to do so ?
> 
> Thanks !
> 



a fix to send RabbitMq messages

2019-05-24 Thread Nicolas Delsaux

Hi all

I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
to kafka with some transform in between.

Doing, so, i've discovered some differences between direct runner
behaviour and Google Dataflow runner.

But first, a small introduction to what I know.

From what I understand, elements transmitted between two different
transforms are serialized/deserialized.

This (de)serialization process is normally managed by Coder, in which
the most used is obviously the Serializablecoder, which takes a
serializable object and (de)serialize it using classical java mechanisms.

On direct runner, i had issues with rabbitMq messages, as they contain
in their headers objects that are LongString, an interface implemented
solely in a private static class of RabbitMq, and used for large text
messages.

So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
(using
pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
new MyCoder())

And it worked ! well, not in dataflow runner.


indeed, it seems like dataflow runner don't use this coder registry
mechanism (for reasons I absolutely don't understand).

So my fix didn't work.

After various tries, I finally gave up and directly modified the
RabbitMqIO class (from Apache Beam) to handle my case.

This fix is available on my Beam fork on GitHub, and i would like to
have it integrated.

What is the procedure to do so ?

Thanks !