[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89042093 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { + + /** + * Check whether a message meets a given condition. If the condition is not met, return the next offset to + * continue searching for the message meeting this condition. + * @param message {@link FetchedMessage} to check. + * @return A {@code long} larger than zero as the next offset to continue searching for the message meeting the + * given condition if the current message doesn't meet the condition. Return {code 0} if the current + * message meets the given condition. Return the earliest offset {@code -2} if no message meeting the + * condition can be found. + */ + public long getCandidateOffset(FetchedMessage message); --- End diff -- I wanted to mean that the offset returned is the next message to be checked, but it is not guaranteed to satisfy the given condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r89042464 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java --- @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.twill.kafka.client; + +/** + * Define interface that could provide a method to check whether a message meets a given condition. If the condition is + * not met, the method will return the next offset to continue searching for the message meeting this condition. + */ +public interface KafkaOffsetProvider { --- End diff -- It will be implemented outside of Twill. The objects will be passed to the constructor of `MessageCallback` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94521832 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java --- @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override -public void onReceived(final Iterator messages) { +public long onReceived(final Iterator messages) { if (stopped.get()) { -return; +return Long.MIN_VALUE; } - Futures.getUnchecked(executor.submit(new Runnable() { + return Futures.getUnchecked(executor.submit(new Callable() { +long nextOffset = Long.MIN_VALUE; @Override -public void run() { +public Long call() { if (stopped.get()) { -return; +return nextOffset; } - callback.onReceived(messages); + nextOffset = callback.onReceived(messages); --- End diff -- Because in line 286, the stored `nextOffset` should be returned --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94522513 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java --- @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override -public void onReceived(final Iterator messages) { +public long onReceived(final Iterator messages) { if (stopped.get()) { -return; +return Long.MIN_VALUE; --- End diff -- In this case, no message is processed, so the offset should remain unchanged to start reading from the current offset again next time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r9505 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Then `startOffset` is not necessary here? `onReceived` can just keep the first message's offset by itself and the caller should guarantee that no empty iterator is passed to `onReceived` as in `SimpleKafkaConsumer`. Or should we still keep `startOffset` to allow empty iterator to be passed in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94522256 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java --- @@ -68,7 +69,7 @@ /** * A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { --- End diff -- I wanted to use the `getLastOffset` method in the test. Haven't figured out a way to extract the logic in `getLastOffset` to other places without exposing the `SimpleKafkaConsumer` class because `getLastOffset` uses a lot of private members in `SimpleKafkaConsumer` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94558457 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java --- @@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final MessageCallback callback, final AtomicBoolean stopped = new AtomicBoolean(); return new MessageCallback() { @Override -public void onReceived(final Iterator messages) { +public long onReceived(final Iterator messages) { if (stopped.get()) { -return; +return Long.MIN_VALUE; --- End diff -- Maybe to pass an `AtomicLong` to this method, and it contains the initial offset and can be set to the next offset? Just realized this is probably an implementation detail in `SimpleKafkaConsumer` and is not general enough to be done in the interface. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94558645 --- Diff: twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java --- @@ -68,7 +69,7 @@ /** * A {@link KafkaConsumer} implementation using the scala kafka api. */ -final class SimpleKafkaConsumer implements KafkaConsumer { +public final class SimpleKafkaConsumer implements KafkaConsumer { --- End diff -- Yes, it's an implementation detail. Just wondering for the future work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94559673 --- Diff: twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java --- @@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception { Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer .MessageCallback() { @Override - public void onReceived(Iterator messages) { + public long onReceived(Iterator messages) { +long nextOffset = Long.MIN_VALUE; +while (messages.hasNext()) { + FetchedMessage message = messages.next(); + LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString()); + latch.countDown(); +} +return nextOffset; + } + + @Override + public void finished() { +stopLatch.countDown(); + } +}); + +Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); +cancel.cancel(); +Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS)); + } + + @Test + public void testKafkaClientReadFromIdx() throws Exception { +String topic = "testClient"; + +// Publish 30 messages with indecies the same as offsets within the range 0 - 29 +Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, "GZIP Testing message", 10); +t1.start(); +t1.join(); +Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, "Testing message", 10, 10); +t2.start(); +t2.join(); +Thread t3 = createPublishThread(kafkaClient, topic, Compression.SNAPPY, "Snappy Testing message", 10, 20); +t3.start(); +t3.join(); + +final int startIdx = 15; +final CountDownLatch latch = new CountDownLatch(30 - startIdx); +final CountDownLatch stopLatch = new CountDownLatch(1); +final BlockingQueue offsetQueue = new LinkedBlockingQueue<>(); +// Creater a consumer +final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) kafkaClient.getConsumer(); +Cancellable initCancel = kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer + .MessageCallback() { + long minOffset = -2; // earliest msg + long maxOffset = -1; // latest msg + @Override + // Use binary search to find the offset of the message with the index matching startIdx. Returns the next offset --- End diff -- Right, since the offsets and indices of the messages are known, it can directly jump to the desired idx without binary search. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94914331 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- Yes, the current implementation follows this at line 454. Is just the caller's responsibility to honor this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94915705 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- how can I get the offset of this first message in this case? `message.getNextOffset()--`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/16#discussion_r94913855 --- Diff: twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java --- @@ -33,14 +33,16 @@ /** * Invoked when new messages is available. + * @param startOffset Offset of the first {@link FetchedMessage} in the iterator of new messages. --- End diff -- I see. So keep the original description of `startOffset`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill issue #16: [TWILL-199] Return the offset to read next message in Kafka...
Github user maochf commented on the issue: https://github.com/apache/twill/pull/16 Thank you! Just squashed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #58: [TWILL-240] EventHandler Improvement
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/58#discussion_r131234842 --- Diff: twill-api/src/main/java/org/apache/twill/api/EventHandler.java --- @@ -124,6 +124,75 @@ public void initialize(EventHandlerContext context) { } /** + * Invoked by the application when it starts. + * + * @param twillAppName name of the current application + * @param runId run ID of current application run + */ + public void started(String twillAppName, RunId runId) { --- End diff -- the `EventHandlerContext` just contains a `EventHandlerSpecification`, and it doesn't seem to guarantee app name and runid are there? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #58: [TWILL-240] EventHandler Improvement
GitHub user maochf opened a pull request: https://github.com/apache/twill/pull/58 [TWILL-240] EventHandler Improvement https://issues.apache.org/jira/browse/TWILL-240 Add started, containerLaunched, containerStopped, completed, killed, aborted to EventHandler You can merge this pull request into a Git repository by running: $ git pull https://github.com/maochf/twill feature/event-handler-improvement Alternatively you can review and apply these changes as the patch at: https://github.com/apache/twill/pull/58.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #58 commit 76eeca04586712123c44caf5788c17e0968a Author: Chengfeng <m...@cask.co> Date: 2017-07-25T19:53:26Z [TWILL-240] Add started, containerLaunched, containerStopped, completed, killed, aborted to EventHandler --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill pull request #58: [TWILL-240] EventHandler Improvement
Github user maochf commented on a diff in the pull request: https://github.com/apache/twill/pull/58#discussion_r132298585 --- Diff: twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java --- @@ -193,7 +200,91 @@ private EventHandler createEventHandler(TwillSpecification twillSpec) throws Cla Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass), "Class {} does not implements {}", handlerClass, EventHandler.class.getName()); -return Instances.newInstance((Class) handlerClass); +final EventHandler delegate = Instances.newInstance((Class) handlerClass); +if (delegate == null) { + // if no handler is specified, return an EventHandler with no-op + return new EventHandler() {}; +} +// wrap the delegate EventHandler so that all errors will be caught +return new EventHandler() { --- End diff -- originally, if `EventHandler#initialize` fails, the app will fail to start. Do we want to change this behavior? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] twill issue #58: [TWILL-240] EventHandler Improvement
Github user maochf commented on the issue: https://github.com/apache/twill/pull/58 squashed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---