[DISCUSS] flink-connector-rabbitmq api changes
Dear All, I want to propose a change to the current RabbitMQ connector. Currently the RMQSource is extracting the body of the message which is a byte array and pass it to a an instance of a user implementation of the DeserializationSchema class to deserialize the body of the message. It also uses the correlation id from the message properties to deduplicate the message. What i want to propose is instead of taking a implementation of a DeserializationSchema in the RMQSource constructor, actually have the user implement an interface that would have methods both the output for the RMQSource and the correlation id used not only from the body of the message but also to it's metadata and properties thus giving the connector much more power and flexibility. This of course would mean a breaking API change for the RMQSource since it will no longer take a DeserializationSchema but an implementation of a predefined interface that has the methods to extract both the output of the RMQSource and the to extract the unique message id as well. The reason behind that is that in my company we were relaying on another property the message id for deduplication of the messages and i also needed that information further down the pipeline and there was absolutely no way of getting it other than modifying the RMQSource. I already have code written but as the rules dictates i have to run it by you guys first before i attempt to create a Jira ticket :) Let me know what you think. Regards, Karim Mansour
Re: [DISCUSS] flink-connector-rabbitmq api changes
Hello, I am new to the mailing list and to contributing in Big opensource projects in general and i don't know if i did something wrong or should be more patient :) I put a topic for discussion as per the contribution guide " https://flink.apache.org/contributing/how-to-contribute.html"; almost a week ago and since what i propose is not backward compatible it needs to be discussed here before opening a ticket and moving forward. So my question is. Will someone pick the discussion up ? or at least someone would say that this is not the way to go ? or should i assume from the silence that it's not important / relevant to the project ? Should i track the author of the connector and send him directly ? Thank you for your time. Regards, Karim Mansour On Fri, Apr 24, 2020 at 11:17 AM seneg...@gmail.com wrote: > Dear All, > > I want to propose a change to the current RabbitMQ connector. > > Currently the RMQSource is extracting the body of the message which is a > byte array and pass it to a an instance of a user implementation of the > DeserializationSchema class to deserialize the body of the message. It > also uses the correlation id from the message properties to deduplicate the > message. > > What i want to propose is instead of taking a implementation of a > DeserializationSchema in the RMQSource constructor, actually have the > user implement an interface that would have methods both the output for the > RMQSource and the correlation id used not only from the body of the message > but also to it's metadata and properties thus giving the connector much > more power and flexibility. > > This of course would mean a breaking API change for the RMQSource since it > will no longer take a DeserializationSchema but an implementation of a > predefined interface that has the methods to extract both the output of the > RMQSource and the to extract the unique message id as well. > > The reason behind that is that in my company we were relaying on another > property the message id for deduplication of the messages and i also needed > that information further down the pipeline and there was absolutely no way > of getting it other than modifying the RMQSource. > > I already have code written but as the rules dictates i have to run it by > you guys first before i attempt to create a Jira ticket :) > > Let me know what you think. > > Regards, > Karim Mansour >
Re: [DISCUSS] flink-connector-rabbitmq api changes
Hello Guys, Thanks for all the responses, i want to stress out that i didn't feel ignored i just thought that i forgot an important step or something. Since i am a newbie i would follow whatever route you guys would suggest :) and i agree that the RMQ connector needs a lot of love still "which i would be happy to submit gradually" as for the code i have it here in the PR: https://github.com/senegalo/flink/pull/1 it's not that much of a change in terms of logic but more of what is exposed. Let me know how you want me to proceed. Thanks again, Karim Mansour On Thu, Apr 30, 2020 at 10:40 AM Aljoscha Krettek wrote: > Hi, > > I think it's good to contribute the changes to Flink directly since we > already have the RMQ connector in the respository. > > I would propose something similar to the Kafka connector, which takes > both the generic DeserializationSchema and a KafkaDeserializationSchema > that is specific to Kafka and allows access to the ConsumerRecord and > therefore all the Kafka features. What do you think about that? > > Best, > Aljoscha > > On 30.04.20 10:26, Robert Metzger wrote: > > Hey Karim, > > > > I'm sorry that you had such a bad experience contributing to Flink, even > > though you are nicely following the rules. > > > > You mentioned that you've implemented the proposed change already. Could > > you share a link to a branch here so that we can take a look? I can > assess > > the API changes easier if I see them :) > > > > Thanks a lot! > > > > > > Best, > > Robert > > > > On Thu, Apr 30, 2020 at 8:09 AM Dawid Wysakowicz > > > wrote: > > > >> Hi Karim, > >> > >> Sorry you did not have the best first time experience. You certainly did > >> everything right which I definitely appreciate. > >> > >> The problem in that particular case, as I see it, is that RabbitMQ is > >> not very actively maintained and therefore it is not easy too find a > >> committer willing to take on this topic. The point of connectors not > >> being properly maintained was raised a few times in the past on the ML. > >> One of the ideas how to improve the situation there was to start a > >> https://flink-packages.org/ page. The idea is to ask active users of > >> certain connectors to maintain those connectors outside of the core > >> project, while giving them a platform within the community where they > >> can make their modules visible. That way it is possible to overcome the > >> lack of capabilities within the core committers without loosing much on > >> the visibility. > >> > >> I would kindly ask you to consider that path, if you are interested. You > >> can of course also wait/reach out to more committers if you feel strong > >> about contributing those changes back to the Flink repository itself. > >> > >> Best, > >> > >> Dawid > >> > >> On 30/04/2020 07:29, seneg...@gmail.com wrote: > >>> Hello, > >>> > >>> I am new to the mailing list and to contributing in Big opensource > >> projects > >>> in general and i don't know if i did something wrong or should be more > >>> patient :) > >>> > >>> I put a topic for discussion as per the contribution guide " > >>> https://flink.apache.org/contributing/how-to-contribute.html"; almost a > >> week > >>> ago and since what i propose is not backward compatible it needs to be > >>> discussed here before opening a ticket and moving forward. > >>> > >>> So my question is. Will someone pick the discussion up ? or at least > >>> someone would say that this is not the way to go ? or should i assume > >> from > >>> the silence that it's not important / relevant to the project ? Should > i > >>> track the author of the connector and send him directly ? > >>> > >>> Thank you for your time. > >>> > >>> Regards, > >>> Karim Mansour > >>> > >>> On Fri, Apr 24, 2020 at 11:17 AM seneg...@gmail.com < > seneg...@gmail.com> > >>> wrote: > >>> > >>>> Dear All, > >>>> > >>>> I want to propose a change to the current RabbitMQ connector. > >>>> > >>>> Currently the RMQSource is extracting the body of the message which > is a > >>>> byte array and pass it to a an instance of a user implementation of > the > >>>> Deserial
Re: [DISCUSS] flink-connector-rabbitmq api changes
Hello, So the proposal is to keep the current RMQSource constructors / public api as is and create new ones that gives more granular parsing ? Regards, Karim Mansour On Thu, Apr 30, 2020 at 5:23 PM Austin Cawley-Edwards < aus...@fintechstudios.com> wrote: > Hey all + thanks Konstantin, > > Like mentioned, we also run into issues with the RMQ Source inflexibility. > I think Aljoscha's idea of supporting both would be a nice way to > incorporate new changes without breaking the current API. > > We'd definitely benefit from the changes proposed here but have another > issue with the Correlation ID. When a message gets in the queue without a > correlation ID, the source errors and the job cannot recover, requiring > (painful) manual intervention. It would be nice to be able to dead-letter > these inputs from the source, but I don't think that's possible with the > current source interface (don't know too much about the source specifics). > We might be able to work around this with a custom Correlation ID > extractor, as proposed by Karim. > > Also, if there are other tickets in the RMQ integrations that have gone > unmaintained, I'm also happy to chip it at maintaining them! > > Best, > Austin > > From: Konstantin Knauf > Sent: Thursday, April 30, 2020 6:14 AM > To: dev > Cc: Austin Cawley-Edwards > Subject: Re: [DISCUSS] flink-connector-rabbitmq api changes > > Hi everyone, > > just looping in Austin as he mentioned that they also ran into issues due > to the inflexibility of the RabiitMQSourcce to me yesterday. > > Cheers, > > Konstantin > > On Thu, Apr 30, 2020 at 11:23 AM seneg...@gmail.com seneg...@gmail.com> mailto:seneg...@gmail.com>> wrote: > Hello Guys, > > Thanks for all the responses, i want to stress out that i didn't feel > ignored i just thought that i forgot an important step or something. > > Since i am a newbie i would follow whatever route you guys would suggest :) > and i agree that the RMQ connector needs a lot of love still "which i would > be happy to submit gradually" > > as for the code i have it here in the PR: > https://github.com/senegalo/flink/pull/1 it's not that much of a change in > terms of logic but more of what is exposed. > > Let me know how you want me to proceed. > > Thanks again, > Karim Mansour > > On Thu, Apr 30, 2020 at 10:40 AM Aljoscha Krettek <mailto:aljos...@apache.org>> > wrote: > > > Hi, > > > > I think it's good to contribute the changes to Flink directly since we > > already have the RMQ connector in the respository. > > > > I would propose something similar to the Kafka connector, which takes > > both the generic DeserializationSchema and a KafkaDeserializationSchema > > that is specific to Kafka and allows access to the ConsumerRecord and > > therefore all the Kafka features. What do you think about that? > > > > Best, > > Aljoscha > > > > On 30.04.20 10:26, Robert Metzger wrote: > > > Hey Karim, > > > > > > I'm sorry that you had such a bad experience contributing to Flink, > even > > > though you are nicely following the rules. > > > > > > You mentioned that you've implemented the proposed change already. > Could > > > you share a link to a branch here so that we can take a look? I can > > assess > > > the API changes easier if I see them :) > > > > > > Thanks a lot! > > > > > > > > > Best, > > > Robert > > > > > > On Thu, Apr 30, 2020 at 8:09 AM Dawid Wysakowicz < > dwysakow...@apache.org<mailto:dwysakow...@apache.org> > > > > > > wrote: > > > > > >> Hi Karim, > > >> > > >> Sorry you did not have the best first time experience. You certainly > did > > >> everything right which I definitely appreciate. > > >> > > >> The problem in that particular case, as I see it, is that RabbitMQ is > > >> not very actively maintained and therefore it is not easy too find a > > >> committer willing to take on this topic. The point of connectors not > > >> being properly maintained was raised a few times in the past on the > ML. > > >> One of the ideas how to improve the situation there was to start a > > >> https://flink-packages.org/ page. The idea is to ask active users of > > >> certain connectors to maintain those connectors outside of the core > > >> project, while giving them a platform within the community where they > > >&g
Re: [DISCUSS] flink-connector-rabbitmq api changes
Hi, Okay so keep the current constructors as is, create new ones with more granular parsing of the results. Sounds like a good plan. How do we proceed from here ? Regards, Karim Mansour On Fri, May 1, 2020 at 5:03 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey, > > (Switching to my personal email) > > Correct me if I'm wrong, but I think Aljoscha is proposing keeping the > public API as is, and adding some new constructors/ custom deserialization > schemas as was done with Kafka. Here's what I was able to find on that > feature: > > * https://issues.apache.org/jira/browse/FLINK-8354 > * > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java > * > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java#L100-L114 > > Best, > Austin > > On Fri, May 1, 2020 at 6:19 AM seneg...@gmail.com > wrote: > > > Hello, > > > > So the proposal is to keep the current RMQSource constructors / public > api > > as is and create new ones that gives more granular parsing ? > > > > Regards, > > Karim Mansour > > > > On Thu, Apr 30, 2020 at 5:23 PM Austin Cawley-Edwards < > > aus...@fintechstudios.com> wrote: > > > > > Hey all + thanks Konstantin, > > > > > > Like mentioned, we also run into issues with the RMQ Source > > inflexibility. > > > I think Aljoscha's idea of supporting both would be a nice way to > > > incorporate new changes without breaking the current API. > > > > > > We'd definitely benefit from the changes proposed here but have another > > > issue with the Correlation ID. When a message gets in the queue > without a > > > correlation ID, the source errors and the job cannot recover, requiring > > > (painful) manual intervention. It would be nice to be able to > dead-letter > > > these inputs from the source, but I don't think that's possible with > the > > > current source interface (don't know too much about the source > > specifics). > > > We might be able to work around this with a custom Correlation ID > > > extractor, as proposed by Karim. > > > > > > Also, if there are other tickets in the RMQ integrations that have gone > > > unmaintained, I'm also happy to chip it at maintaining them! > > > > > > Best, > > > Austin > > > > > > From: Konstantin Knauf > > > Sent: Thursday, April 30, 2020 6:14 AM > > > To: dev > > > Cc: Austin Cawley-Edwards > > > Subject: Re: [DISCUSS] flink-connector-rabbitmq api changes > > > > > > Hi everyone, > > > > > > just looping in Austin as he mentioned that they also ran into issues > due > > > to the inflexibility of the RabiitMQSourcce to me yesterday. > > > > > > Cheers, > > > > > > Konstantin > > > > > > On Thu, Apr 30, 2020 at 11:23 AM seneg...@gmail.com > > seneg...@gmail.com> mailto:seneg...@gmail.com>> > > wrote: > > > Hello Guys, > > > > > > Thanks for all the responses, i want to stress out that i didn't feel > > > ignored i just thought that i forgot an important step or something. > > > > > > Since i am a newbie i would follow whatever route you guys would > suggest > > :) > > > and i agree that the RMQ connector needs a lot of love still "which i > > would > > > be happy to submit gradually" > > > > > > as for the code i have it here in the PR: > > > https://github.com/senegalo/flink/pull/1 it's not that much of a > change > > in > > > terms of logic but more of what is exposed. > > > > > > Let me know how you want me to proceed. > > > > > > Thanks again, > > > Karim Mansour > > > > > > On Thu, Apr 30, 2020 at 10:40 AM Aljoscha Krettek > > <mailto:aljos...@apache.org>> > > > wrote: > > > > > > > Hi, > > > > > > > > I think it's good to contribute the changes to Flink directly since > we > > > > already have the RMQ connector in the respository. > > > > > > > > I would propose something similar to the Kafka connector, which takes > > > > both the generic Deserializat
Re: [DISCUSS] flink-connector-rabbitmq api changes
Hi, Okay i created a ticket: https://issues.apache.org/jira/browse/FLINK-17502 i will work on the modifications "keeping the old constructor" and brush up on the contribution guides and move from there :) Regards, Karim Mansour On Mon, May 4, 2020 at 10:00 AM Aljoscha Krettek wrote: > Yes, that's what I was proposing! > > @Karim If there's not already a Jira issue, please create one. You can > ping me, so that I can assign you. > > @Austin There's a Jira component for the RMQ source, maybe you can take > a stab at some of the issues there: > > https://issues.apache.org/jira/browse/FLINK-17204?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Connectors%2F%20RabbitMQ%22%20AND%20statusCategory%20!%3D%20Done > . > > Best, > Aljoscha > > On 03.05.20 16:38, seneg...@gmail.com wrote: > > Hi, > > > > Okay so keep the current constructors as is, create new ones with more > > granular parsing of the results. Sounds like a good plan. > > > > How do we proceed from here ? > > > > Regards, > > Karim Mansour > > > > On Fri, May 1, 2020 at 5:03 PM Austin Cawley-Edwards < > > austin.caw...@gmail.com> wrote: > > > >> Hey, > >> > >> (Switching to my personal email) > >> > >> Correct me if I'm wrong, but I think Aljoscha is proposing keeping the > >> public API as is, and adding some new constructors/ custom > deserialization > >> schemas as was done with Kafka. Here's what I was able to find on that > >> feature: > >> > >> * https://issues.apache.org/jira/browse/FLINK-8354 > >> * > >> > >> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java > >> * > >> > >> > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java#L100-L114 > >> > >> Best, > >> Austin > >> > >> On Fri, May 1, 2020 at 6:19 AM seneg...@gmail.com > >> wrote: > >> > >>> Hello, > >>> > >>> So the proposal is to keep the current RMQSource constructors / public > >> api > >>> as is and create new ones that gives more granular parsing ? > >>> > >>> Regards, > >>> Karim Mansour > >>> > >>> On Thu, Apr 30, 2020 at 5:23 PM Austin Cawley-Edwards < > >>> aus...@fintechstudios.com> wrote: > >>> > >>>> Hey all + thanks Konstantin, > >>>> > >>>> Like mentioned, we also run into issues with the RMQ Source > >>> inflexibility. > >>>> I think Aljoscha's idea of supporting both would be a nice way to > >>>> incorporate new changes without breaking the current API. > >>>> > >>>> We'd definitely benefit from the changes proposed here but have > another > >>>> issue with the Correlation ID. When a message gets in the queue > >> without a > >>>> correlation ID, the source errors and the job cannot recover, > requiring > >>>> (painful) manual intervention. It would be nice to be able to > >> dead-letter > >>>> these inputs from the source, but I don't think that's possible with > >> the > >>>> current source interface (don't know too much about the source > >>> specifics). > >>>> We might be able to work around this with a custom Correlation ID > >>>> extractor, as proposed by Karim. > >>>> > >>>> Also, if there are other tickets in the RMQ integrations that have > gone > >>>> unmaintained, I'm also happy to chip it at maintaining them! > >>>> > >>>> Best, > >>>> Austin > >>>> > >>>> From: Konstantin Knauf > >>>> Sent: Thursday, April 30, 2020 6:14 AM > >>>> To: dev > >>>> Cc: Austin Cawley-Edwards > >>>> Subject: Re: [DISCUSS] flink-connector-rabbitmq api changes > >>>> > >>>> Hi everyone, > >>>> > >>>> just looping in Austin as he mentioned that they also ran into issues > >> due > >>>> to the inflexibility of the RabiitMQSourcce to me yesterday. > >>>> > >>>> Cheers, > >>>&g
Re: [DISCUSS] flink-connector-rabbitmq api changes
@Austin in my initial implementation you get the envelope as well. I basically pass to the interface everything i get from the RMQ client https://github.com/senegalo/flink/blob/e67f344884b4186126c38eaa8e112d6e5cf1152e/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeliveryParser.java#L26 Regards, Karim Mansour On Tue, May 5, 2020 at 12:38 AM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Thanks Aljoscha, > > I'm happy to take FLINK-17204 > <https://issues.apache.org/jira/browse/FLINK-17204> for now, if you're > able > to assign it to me, and we'll go from there? > > Excited to use what you come up with Karim! It also looks like FLINK-8510 > <https://issues.apache.org/jira/browse/FLINK-8510> might also have some > ideas on getting access to more RMQ-specific data in the source. > > Best, > Austin > > On Mon, May 4, 2020 at 6:58 AM seneg...@gmail.com > wrote: > > > Hi, > > > > Okay i created a ticket: > https://issues.apache.org/jira/browse/FLINK-17502 > > > > i will work on the modifications "keeping the old constructor" and brush > up > > on the contribution guides and move from there :) > > > > Regards, > > Karim Mansour > > > > On Mon, May 4, 2020 at 10:00 AM Aljoscha Krettek > > wrote: > > > > > Yes, that's what I was proposing! > > > > > > @Karim If there's not already a Jira issue, please create one. You can > > > ping me, so that I can assign you. > > > > > > @Austin There's a Jira component for the RMQ source, maybe you can take > > > a stab at some of the issues there: > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-17204?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Connectors%2F%20RabbitMQ%22%20AND%20statusCategory%20!%3D%20Done > > > . > > > > > > Best, > > > Aljoscha > > > > > > On 03.05.20 16:38, seneg...@gmail.com wrote: > > > > Hi, > > > > > > > > Okay so keep the current constructors as is, create new ones with > more > > > > granular parsing of the results. Sounds like a good plan. > > > > > > > > How do we proceed from here ? > > > > > > > > Regards, > > > > Karim Mansour > > > > > > > > On Fri, May 1, 2020 at 5:03 PM Austin Cawley-Edwards < > > > > austin.caw...@gmail.com> wrote: > > > > > > > >> Hey, > > > >> > > > >> (Switching to my personal email) > > > >> > > > >> Correct me if I'm wrong, but I think Aljoscha is proposing keeping > the > > > >> public API as is, and adding some new constructors/ custom > > > deserialization > > > >> schemas as was done with Kafka. Here's what I was able to find on > that > > > >> feature: > > > >> > > > >> * https://issues.apache.org/jira/browse/FLINK-8354 > > > >> * > > > >> > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java > > > >> * > > > >> > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java#L100-L114 > > > >> > > > >> Best, > > > >> Austin > > > >> > > > >> On Fri, May 1, 2020 at 6:19 AM seneg...@gmail.com < > seneg...@gmail.com > > > > > > >> wrote: > > > >> > > > >>> Hello, > > > >>> > > > >>> So the proposal is to keep the current RMQSource constructors / > > public > > > >> api > > > >>> as is and create new ones that gives more granular parsing ? > > > >>> > > > >>> Regards, > > > >>> Karim Mansour > > > >>> > > > >>> On Thu, Apr 30, 2020 at 5:23 PM Austin Cawley-Edwards < > > > >>> aus...@fintechstudios.com> wrote: > > > >>> > > > >>>> Hey all + thanks Konstantin, > > > >>>> > > > >>>> Like mentioned, we also run into issues with the RMQ Source > > >