Coders are all set up by the SDK before the pipeline is given to a runner,
so that sounds like a strange issue. Would you also file a Jira ticket
about your experience with the coder registry and the DataflowRunner?

On Fri, May 24, 2019 at 5:26 AM Nicolas Delsaux <[email protected]>
wrote:

> Thanks, PR is started (https://github.com/apache/beam/pull/8677), and
> I've set both Alexey and you as potential reviewers.
>
> Le 24/05/2019 à 13:55, Jean-Baptiste Onofré a écrit :
> > Hi,
> >
> > You can create a PullRequest, I will do the review.
> >
> > The coder is set on the RabbitMQIO PTransform, so, it should work.
> >
> > AFAIR, we have a Jira about that and I already started to check. Not yet
> > completed yet.
> >
> > Regards
> > JB
> >
> > On 24/05/2019 11:01, Nicolas Delsaux wrote:
> >> Hi all
> >>
> >> I'm currently evaluationg Apache Beam to transfer messages from RabbitMq
> >> to kafka with some transform in between.
> >>
> >> Doing, so, i've discovered some differences between direct runner
> >> behaviour and Google Dataflow runner.
> >>
> >> But first, a small introduction to what I know.
> >>
> >>  From what I understand, elements transmitted between two different
> >> transforms are serialized/deserialized.
> >>
> >> This (de)serialization process is normally managed by Coder, in which
> >> the most used is obviously the Serializablecoder, which takes a
> >> serializable object and (de)serialize it using classical java
> mechanisms.
> >>
> >> On direct runner, i had issues with rabbitMq messages, as they contain
> >> in their headers objects that are LongString, an interface implemented
> >> solely in a private static class of RabbitMq, and used for large text
> >> messages.
> >>
> >> So I wrote a RabbitMqMessageCoder, and installed it in my pipeline
> >> (using
> >> pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class,
> >> new MyCoder())
> >>
> >> And it worked ! well, not in dataflow runner.
> >>
> >>
> >> indeed, it seems like dataflow runner don't use this coder registry
> >> mechanism (for reasons I absolutely don't understand).
> >>
> >> So my fix didn't work.
> >>
> >> After various tries, I finally gave up and directly modified the
> >> RabbitMqIO class (from Apache Beam) to handle my case.
> >>
> >> This fix is available on my Beam fork on GitHub, and i would like to
> >> have it integrated.
> >>
> >> What is the procedure to do so ?
> >>
> >> Thanks !
> >>
>

Reply via email to