[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-10-14 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r504691988



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##
@@ -268,54 +304,141 @@ public void run(SourceContext ctx) throws Exception 
{
}
}
 
-   private class RMQCollector implements Collector {
+   @Override
+   public void cancel() {
+   running = false;
+   }
+
+   @Override
+   protected void acknowledgeSessionIDs(List sessionIds) {
+   try {
+   for (long id : sessionIds) {
+   channel.basicAck(id, false);
+   }
+   channel.txCommit();
+   } catch (IOException e) {
+   throw new RuntimeException("Messages could not be 
acknowledged during checkpoint creation.", e);
+   }
+   }
+
+   @Override
+   public TypeInformation getProducedType() {
+   return deliveryDeserializer.getProducedType();
+   }
 
+   /**
+* Special collector for RMQ messages.
+* Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+* processed or not.
+*/
+   private class RMQCollectorImpl implements 
RMQDeserializationSchema.RMQCollector {

Review comment:
   i think it's fine it's just missing the reset method to reset the 
`customIdentifiersSet, correlationId and deliveryTag` whenever  a new message 
is delivered.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-10-14 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r504682583



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param  The output type of the {@link RMQSource}
+ */
+public abstract class RMQDeserializationSchema implements  Serializable, 
ResultTypeQueryable {

Review comment:
   Done .. i actually didn't know the syntax .. my bad





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-10-04 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r499228712



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##
@@ -219,7 +221,7 @@ public void testCheckpointing() throws Exception {
 
assertEquals(numIds, messageIds.size());
if (messageIds.size() > 0) {
-   
assertTrue(messageIds.contains(Long.toString(lastSnapshotId)));
+   
assertTrue(messageIds.contains(Long.toString(lastSnapshotId - 1)));

Review comment:
   I spent 2 hours tracking down the answer of this one as i completely 
forgot the answer ! i had to check the operator state stored and retrieved 
between upstream/master and origin ! it was a fun ride !
   Here is the answer:
   
   the `messageId` gets incremented every time the `Envelope.getDeliveryTag` is 
called from the mocked `Envelope` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L607-L612
   
   `messageId` is also used as the `correlationID` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L618-L623
   
   so if you get the `correlationID` before `getDeliveryTag` you get 
`correlationID: 0, deliveryTag: 1` and the opposite you get `correlationID: 1, 
deliveryTag: 1`
   
   in the [previous 
code](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L248-L250)
 `getDeliveryTag` was called before `getCorrelationID` resulting in the state 
stored 
[here](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L254)
 to start with `1` which is later retrieved and checked in the 
[testCheckpointing](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L213-L223)
 for it's presence 
   
   in the new code the `getDeliveryTag` is called after `getCorrelationId` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L286
 which would make the state stored starts from `0` instead of `1`. 
   
   This is why i am subtracting `1` from the value as we're starting the list 
from `0` not `1`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-10-04 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r499228712



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##
@@ -219,7 +221,7 @@ public void testCheckpointing() throws Exception {
 
assertEquals(numIds, messageIds.size());
if (messageIds.size() > 0) {
-   
assertTrue(messageIds.contains(Long.toString(lastSnapshotId)));
+   
assertTrue(messageIds.contains(Long.toString(lastSnapshotId - 1)));

Review comment:
   I spent 2 hours tracking down the answer of this one as i completely 
forgot the answer ! i have to check the operator state stored and retrieved 
between upstream/master and origin ! it was a fun ride !
   Here is the answer:
   
   the `messageId` gets incremented every time the `Envelope.getDeliveryTag` is 
called from the mocked `Envelope` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L607-L612
   
   `messageId` is also used as the `correlationID` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L618-L623
   
   so if you get the `correlationID` before `getDeliveryTag` you get 
`correlationID: 0, deliveryTag: 1` and the opposite you get `correlationID: 1, 
deliveryTag: 1`
   
   in the [previous 
code](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L248-L250)
 `getDeliveryTag` was called before `getCorrelationID` resulting in the state 
stored 
[here](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L254)
 to start with `1` which is later retrieved and checked in the 
[testCheckpointing](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L213-L223)
 for it's presence 
   
   in the new code the `getDeliveryTag` is called after `getCorrelationId` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L286
 which would make the state stored starts from `0` instead of `1`. 
   
   This is why i am subtracting `1` from the value as we're starting the list 
from `0` not `1`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-10-04 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r499214594



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##
@@ -318,6 +320,36 @@ public void testConstructorParams() throws Exception {
assertEquals("passTest", testObj.getFactory().getPassword());
}
 
+   /**
+* Tests getting the correct body and correlationID given which 
constructor was called.
+* if the constructor with the {@link DeserializationSchema} was called 
it should extract the body of the message
+* from the {@link Delivery} and the correlation ID from the {@link 
AMQP.BasicProperties} which are
+* mocked to "I Love Turtles" and "0".
+* if the constructor with the {@link RMQDeserializationSchema} was 
called it uses the
+* {@link RMQDeserializationSchema#deserialize} method to parse the 
message and extract the correlation ID which
+* both are implemented in {@link RMQTestSource#initAMQPMocks()} to 
return the
+* {@link AMQP.BasicProperties#getMessageId()} that is mocked to return 
"1-MESSAGE_ID"
+*/
+   @Test
+   public void testProcessMessage() throws Exception {

Review comment:
   Fixed all warnings but the unchecked generics. apparently that's a known 
issue: https://github.com/mockito/mockito/issues/1531





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-07-12 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r453306517



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param  The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to the
+* collector.
+* NOTICE: The implementation of this method MUST call {@link 
RMQCollector#setMessageIdentifiers(String, long)} with
+* the correlation ID of the message if checkpointing and 
UseCorrelationID (in the RMQSource constructor) were enabled
+* the {@link RMQSource}.
+* @param envelope
+* @param properties
+* @param body
+* @throws IOException
+*/
+   public  void processMessage(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body, RMQCollector collector) throws IOException;
+
+   /**
+* Method to decide whether the element signals the end of the stream. 
If
+* true is returned the element won't be emitted.
+*
+* @param nextElement The element to test for the end-of-stream signal.
+* @return True, if the element signals end of stream, false otherwise.
+*/
+   public boolean isEndOfStream(T nextElement);
+
+   /**
+* The {@link TypeInformation} for the deserialized T.
+* As an example the proper implementation of this method if T is a 
String is:
+* {@code return TypeExtractor.getForClass(String.class)}
+* @return TypeInformation
+*/
+   public TypeInformation getProducedType();
+
+
+   /**
+* Special collector for RMQ messages.
+* Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+* processed or not.
+* @param 
+*/
+   public interface RMQCollector extends Collector {
+   public void collect(List records);
+
+   public void setMessageIdentifiers(String correlationId, long 
deliveryTag);
+   }

Review comment:
   I think i finally understand your point. 
   You're saying if there is a situation when 1 AMQP message received creates N 
messages to the collector 1:n there might be a situation where those N messages 
could have different corrlelation ids ? and there should be a way for them to 
be reset somehow ?
   
   but then:
   The suggestion above will not work since we create an instance of the 
collector every new message we get. So the reset is kinnda implied there.
   
   Also you can achieve the 1:N with multiple correlation IDs but just calling 
the `setMessageIdentifiers` multiple times.
   
   Let me know if got it right ... also that's my last working week then i have 
2 weeks off .. 1 of them I'll be "safely" travelling :D the other one i intend 
to be creative and finish the 100 projects i am working on right now. One of 
the main topics is close this PR .. So I'll be like a car on the Autoban ... 
with No Speed Limits  Vrooom Vroom 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-06-28 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446638002



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##
@@ -239,25 +294,75 @@ public void run(SourceContext ctx) throws Exception {
}
}
 
-   private class RMQCollector implements Collector {
-
+   /**
+* Special collector for RMQ messages.
+* Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+* processed or not.
+*/
+   private class RMQCollectorImpl implements 
RMQDeserializationSchema.RMQCollector {
private final SourceContext ctx;
private boolean endOfStreamSignalled = false;
+   private long deliveryTag;
+   private Boolean preCheckFlag;
 
-   private RMQCollector(SourceContext ctx) {
+   private RMQCollectorImpl(SourceContext ctx) {
this.ctx = ctx;
}
 
@Override
public void collect(OUT record) {
-   if (endOfStreamSignalled || 
schema.isEndOfStream(record)) {
-   this.endOfStreamSignalled = true;
+   Preconditions.checkNotNull(preCheckFlag, 
"setCorrelationID must be called at least once before" +
+   "calling this method !");
+
+   if (!preCheckFlag) {
return;
}
 
+   if (isEndOfStream(record)) {
+   this.endOfStreamSignalled = true;
+   return;
+   }
ctx.collect(record);
}
 
+   public void collect(List records) {
+   Preconditions.checkNotNull(preCheckFlag, 
"setCorrelationID must be called at least once before" +
+   "calling this method !");
+
+   if (!preCheckFlag) {
+   return;
+   }
+
+   for (OUT record : records){
+   if (isEndOfStream(record)) {
+   this.endOfStreamSignalled = true;
+   return;
+   }
+   ctx.collect(record);
+   }
+   }
+
+   public void setMessageIdentifiers(String correlationId, long 
deliveryTag){
+   preCheckFlag = true;
+   if (!autoAck) {
+   if (usesCorrelationId) {
+   
Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
+   "with usesCorrelationId set to 
true yet we couldn't extract the correlation id from it !");
+   if (!addId(correlationId)) {
+   // we have already processed 
this message
+   preCheckFlag = false;
+   }
+   }
+   sessionIds.add(deliveryTag);
+   }
+   }

Review comment:
   That was actually my initial approach but here is what i faced that made 
me split the validation logic and write the code the way i did.
   
   Given that we also might be calling the `collect(List record)` we would 
either have to:
   * copy paste the validation logic so it runs also is checked there.
   * make the `collect(List record)` call the `collect(T record) method 
internally to add singular records but then the validation code will be called 
multiple times and yielding the same result.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-06-28 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446636396



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##
@@ -382,6 +414,27 @@ public boolean isEndOfStream(String nextElement) {
}
}
 
+   private class CustomDeserializationSchema implements 
RMQDeserializationSchema {
+   @Override
+   public TypeInformation getProducedType() {
+   return TypeExtractor.getForClass(String.class);
+   }
+
+   @Override
+   public void processMessage(Envelope envelope, 
AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws 
IOException {
+   List messages = new ArrayList();
+   messages.add("I Love Turtles");
+   messages.add("Brush your teeth");

Review comment:
   i am sorry i am known to play around with tests strings :D i can change 
it ... but then what do you have against teeth brushing !? explain yourself 
mister ! :D





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-06-28 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446636112



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##
@@ -208,29 +252,40 @@ public void close() throws Exception {
}
}
 
+   /**
+* Parse and collects the body of the an AMQP message.
+*
+* If any of the constructors with the {@link DeserializationSchema} 
class was used to construct the source
+* it uses the {@link DeserializationSchema#deserialize(byte[])} to 
parse the body of the AMQP message.
+*
+* If any of the constructors with the {@link 
RMQDeserializationSchema } class was used to construct the source it uses the
+* {@link RMQDeserializationSchema#processMessage(Envelope, 
AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)}
+* method of that provided instance.
+*
+* @param delivery the AMQP {@link QueueingConsumer.Delivery}
+* @param collector a {@link RMQCollectorImpl} to collect the data
+* @throws IOException
+*/
+   protected void processMessage(QueueingConsumer.Delivery delivery, 
RMQDeserializationSchema.RMQCollector collector) throws IOException {
+   AMQP.BasicProperties properties = delivery.getProperties();
+   byte[] body = delivery.getBody();
+
+   if (deliveryDeserializer != null){
+   Envelope envelope = delivery.getEnvelope();
+   deliveryDeserializer.processMessage(envelope, 
properties, body, collector);
+   } else {
+   
collector.setMessageIdentifiers(properties.getCorrelationId(), 
delivery.getEnvelope().getDeliveryTag());
+   collector.collect(schema.deserialize(body));
+   }
+   }

Review comment:
   actually after your awesome suggestion of wrapping the deserilaztion 
schema into the new interface this method is now:
   
   ```
AMQP.BasicProperties properties = delivery.getProperties();
byte[] body = delivery.getBody();
Envelope envelope = delivery.getEnvelope();
   
deliveryDeserializer.deserialize(envelope, properties, body, 
collector);
   ```
   Hence my comment on top "Elegant Solution !"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-06-28 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446635690



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param  The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to the
+* collector.
+* NOTICE: The implementation of this method MUST call {@link 
RMQCollector#setMessageIdentifiers(String, long)} with
+* the correlation ID of the message if checkpointing and 
UseCorrelationID (in the RMQSource constructor) were enabled
+* the {@link RMQSource}.
+* @param envelope
+* @param properties
+* @param body
+* @throws IOException
+*/
+   public  void processMessage(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body, RMQCollector collector) throws IOException;
+
+   /**
+* Method to decide whether the element signals the end of the stream. 
If
+* true is returned the element won't be emitted.
+*
+* @param nextElement The element to test for the end-of-stream signal.
+* @return True, if the element signals end of stream, false otherwise.
+*/
+   public boolean isEndOfStream(T nextElement);

Review comment:
   nit pick all you want ! 
   
   `isEndOfStream` is closed for public by order of @austince  till further 
notice !





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-06-28 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446635536



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param  The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema extends Serializable, 
ResultTypeQueryable {
+   /**
+* This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to the
+* collector.
+* NOTICE: The implementation of this method MUST call {@link 
RMQCollector#setMessageIdentifiers(String, long)} with
+* the correlation ID of the message if checkpointing and 
UseCorrelationID (in the RMQSource constructor) were enabled
+* the {@link RMQSource}.
+* @param envelope
+* @param properties
+* @param body
+* @throws IOException
+*/
+   public  void processMessage(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body, RMQCollector collector) throws IOException;
+
+   /**
+* Method to decide whether the element signals the end of the stream. 
If
+* true is returned the element won't be emitted.
+*
+* @param nextElement The element to test for the end-of-stream signal.
+* @return True, if the element signals end of stream, false otherwise.
+*/
+   public boolean isEndOfStream(T nextElement);
+
+   /**
+* The {@link TypeInformation} for the deserialized T.
+* As an example the proper implementation of this method if T is a 
String is:
+* {@code return TypeExtractor.getForClass(String.class)}
+* @return TypeInformation
+*/
+   public TypeInformation getProducedType();
+
+
+   /**
+* Special collector for RMQ messages.
+* Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+* processed or not.
+* @param 
+*/
+   public interface RMQCollector extends Collector {
+   public void collect(List records);
+
+   public void setMessageIdentifiers(String correlationId, long 
deliveryTag);
+   }

Review comment:
   The point of having the method `setMessageIdentifiers` was the make sure 
that the user supplies the `correlationId` and `deliveryTag` before calling 
either `collect(T record)` or `collect(List record` since those are used to 
validate that the record has not been added before.
   
   If we go with the proposed interface we will have to call the 
`collect(String correlationId, long deliveryTag, T record)` method always and 
if you call `void collect(T record)` either run it without the checked because 
the `correlationId` and `deliveryTag` where not supplied or throw an error.
   
   I also didn't understand the `reset()` method :D and it's relationship with 
the state. 
   
   i'll keep the interface as is till you guys clarify this little bit of info 
for me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-06-28 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446633295



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##
@@ -77,6 +78,7 @@
private final RMQConnectionConfig rmqConnectionConfig;
protected final String queueName;
private final boolean usesCorrelationId;
+   protected RMQDeserializationSchema deliveryDeserializer;
protected DeserializationSchema schema;

Review comment:
   just a curious question "i am not that good with Java tbh ! 
   Why a static function here ? why not a private method since it will only be 
used from within the class ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] senegalo commented on a change in pull request #12056: [FLINK-17502] [flink-connector-rabbitmq] RMQSource refactor

2020-06-28 Thread GitBox


senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446633193



##
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##
@@ -77,6 +78,7 @@
private final RMQConnectionConfig rmqConnectionConfig;
protected final String queueName;
private final boolean usesCorrelationId;
+   protected RMQDeserializationSchema deliveryDeserializer;
protected DeserializationSchema schema;

Review comment:
   Elegant solution !! 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org