[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user asfgit closed the pull request at: https://github.com/apache/twill/pull/16 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r95203552 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- I think the `FetchedMessage` doesn't carry the "current" offset, but the next message offset, right? Meaning one can not keep consuming the same message (e.g. there is some failure that it needs to retry on the same message). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r9505 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Then `startOffset` is not necessary here? `onReceived` can just keep the first message's offset by itself and the caller should guarantee that no empty iterator is passed to `onReceived` as in `SimpleKafkaConsumer`. Or should we still keep `startOffset` to allow empty iterator to be passed in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94918807 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Wrap it with a peeking iterator from guava --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94915705 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- how can I get the offset of this first message in this case? `message.getNextOffset()--`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94914909 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Not exactly. E.g. let say the offset submitted to Kafka for fetching is "0", but the message with offset "0" is already gone, so the actual first message fetched is having a larger offset. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94914331 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Yes, the current implementation follows this at line 454. Is just the caller's responsibility to honor this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94913855 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- I see. So keep the original description of `startOffset`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94913560 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- I understand the behavior, but is it a good API? As you see, it is quite difficult to explain, hence documenting it correctly. I feel it's much easier to explain and to use if we say the `startOffset` is the offset of the first message in the given iterator. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94817038 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -189,6 +194,49 @@ public void finished() { } @Test + public void testKafkaClientSkipNext() throws Exception { +String topic = "testClient"; +// Publish 30 messages with indecies the same as offsets within the range 0 - 29 +Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); +t1.start(); +t1.join(); +Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); +t2.start(); +t2.join(); +Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); +t3.start(); +t3.join(); + +final CountDownLatch stopLatch = new CountDownLatch(1); +final BlockingQueue offsetQueue = new LinkedBlockingQueue<>(); +Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume( + new KafkaConsumer.MessageCallback() { + @Override + public long onReceived(long startOffset, Iterator messages) { +if (messages.hasNext()) { + offsetQueue.offer(startOffset); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + return message.getNextOffset() + 1; +} +return startOffset; + } + + @Override + public void finished() { +stopLatch.countDown(); + } +}); +// 15 messages should be in the queue since onReceived returns `message.getNextOffset() + 1` as next offset to read +for (int i = 0; i < 30; i += 2) { + Assert.assertTrue(i == offsetQueue.poll(60, TimeUnit.SECONDS)); +} +Assert.assertEquals(0, offsetQueue.size()); --- End diff -- should do a `Assert.assertNull(offsetQueue.poll(2, TimeUnit.SECONDS))` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94688950 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -170,11 +172,57 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override - public void onReceived(Iterator messages) { + public long onReceived(Iterator messages, long startOffset) { +long nextOffset = startOffset; while (messages.hasNext()) { - LOG.info(Charsets.UTF_8.decode(messages.next().getPayload()).toString()); + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); latch.countDown(); } +return nextOffset; + } + + @Override + public void finished() { +stopLatch.countDown(); + } +}); + +Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); +cancel.cancel(); +Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientSkipNext() throws Exception { +String topic = "testClient"; +// Publish 30 messages with indecies the same as offsets within the range 0 - 29 +Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); +t1.start(); +t1.join(); +Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); +t2.start(); +t2.join(); +Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); +t3.start(); +t3.join(); + +// 15 messages will be counted since onReceived returns `message.getNextOffset() + 1` as next offset to read --- End diff -- I don't think the test is correct. You published 30 messages in three message set, hence the `onReceived` method will be called three times. The first time with messages 0-9 and you return 11. The second call with 11-19, and you return 20. The last call with 21-29. So in total there will be more than 15 messages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94688058 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java --- @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override -public void onReceived(final Iterator messages) { +public long onReceived(final Iterator messages, final long startOffset) { if (stopped.get()) { -return; +return startOffset; } - Futures.getUnchecked(executor.submit(new Runnable() { + return Futures.getUnchecked(executor.submit(new Callable() { +long nextOffset = startOffset; --- End diff -- You don't need this local variable. If the consumer is stopped, it always returns the `startOffset` inside the callable. Otherwise, the callable returns whatever returned by the `callback` delegate. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user chtyim commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94687750 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -35,12 +35,14 @@ * Invoked when new messages is available. * @param messages Iterator of new messages. The {@link FetchedMessage} instance maybe reused in the Iterator * and across different invocation. + * @param startOffset Offset of the current message to be consumed. + * @return The offset of the message to be fetched next. */ -void onReceived(Iterator messages); +long onReceived(Iterator messages, long startOffset); --- End diff -- Can you revert the ordering of the parameters? Logically it fetches from the `startOffset` to produce the `Iterator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---