[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/twill/pull/16


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-09 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r95203552
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

I think the `FetchedMessage` doesn't carry the "current" offset, but the 
next message offset, right? Meaning one can not keep consuming the same message 
(e.g. there is some failure that it needs to retry on the same message).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r9505
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Then `startOffset` is not necessary here? `onReceived` can just keep the 
first message's offset by itself and the caller should guarantee that no empty 
iterator is passed to `onReceived` as in `SimpleKafkaConsumer`. Or should we 
still keep `startOffset` to allow empty iterator to be passed in?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94918807
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Wrap it with a peeking iterator from guava


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94915705
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

how can I get the offset of this first message in this case? 
`message.getNextOffset()--`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94914909
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Not exactly. E.g. let say the offset submitted to Kafka for fetching is 
"0", but the message with offset "0" is already gone, so the actual first 
message fetched is having a larger offset.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94914331
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Yes, the current implementation follows this at line 454. Is just the 
caller's responsibility to honor this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94913855
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

I see. So keep the original description of `startOffset`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94913560
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

I understand the behavior, but is it a good API? As you see, it is quite 
difficult to explain, hence documenting it correctly. I feel it's much easier 
to explain and to use if we say the `startOffset` is the offset of the first 
message in the given iterator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-05 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94817038
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -189,6 +194,49 @@ public void finished() {
   }
 
   @Test
+  public void testKafkaClientSkipNext() throws Exception {
+String topic = "testClient";
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+final CountDownLatch stopLatch = new CountDownLatch(1);
+final BlockingQueue offsetQueue = new LinkedBlockingQueue<>();
+Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(
+  new KafkaConsumer.MessageCallback() {
+  @Override
+  public long onReceived(long startOffset, Iterator 
messages) {
+if (messages.hasNext()) {
+  offsetQueue.offer(startOffset);
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+  return message.getNextOffset() + 1;
+}
+return startOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+// 15 messages should be in the queue since onReceived returns 
`message.getNextOffset() + 1` as next offset to read
+for (int i = 0; i < 30; i += 2) {
+  Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS));
+}
+Assert.assertEquals(0, offsetQueue.size());
--- End diff --

should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` 
instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94688950
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception {
 Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
   .MessageCallback() {
   @Override
-  public void onReceived(Iterator messages) {
+  public long onReceived(Iterator messages, long 
startOffset) {
+long nextOffset = startOffset;
 while (messages.hasNext()) {
-  
LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString());
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
   latch.countDown();
 }
+return nextOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+
+Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+cancel.cancel();
+Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testKafkaClientSkipNext() throws Exception {
+String topic = "testClient";
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+// 15 messages will be counted since onReceived returns 
`message.getNextOffset() + 1` as next offset to read
--- End diff --

I don't think the test is correct. You published 30 messages in three 
message set, hence the `onReceived` method will be called three times. The 
first time with messages 0-9 and you return 11. The second call with 11-19, and 
you return 20. The last call with 21-29. So in total there will be more than 15 
messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94688058
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages, 
final long startOffset) {
   if (stopped.get()) {
-return;
+return startOffset;
   }
-  Futures.getUnchecked(executor.submit(new Runnable() {
+  return Futures.getUnchecked(executor.submit(new Callable() 
{
+long nextOffset = startOffset;
--- End diff --

You don't need this local variable. If the consumer is stopped, it always 
returns the `startOffset` inside the callable. Otherwise, the callable returns 
whatever returned by the `callback` delegate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-04 Thread chtyim
Github user chtyim commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94687750
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -35,12 +35,14 @@
  * Invoked when new messages is available.
  * @param messages Iterator of new messages. The {@link 
FetchedMessage} instance maybe reused in the Iterator
  * and across different invocation.
+ * @param startOffset Offset of the current message to be consumed.
+ * @return The offset of the message to be fetched next.
  */
-void onReceived(Iterator messages);
+long onReceived(Iterator messages, long startOffset);
--- End diff --

Can you revert the ordering of the parameters? Logically it fetches from 
the `startOffset` to produce the `Iterator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---