[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2016-11-21 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r89042093
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+/**
+ * Define interface that could provide a method to check whether a message 
meets a given condition. If the condition is
+ * not met, the method will return the next offset to continue searching 
for the message meeting this condition.
+ */
+public interface KafkaOffsetProvider {
+
+  /**
+   * Check whether a message meets a given condition. If the condition is 
not met, return the next offset to
+   * continue searching for the message meeting this condition.
+   * @param message {@link FetchedMessage} to check.
+   * @return A {@code long} larger than zero as the next offset to 
continue searching for the message meeting the
+   * given condition if the current message doesn't meet the 
condition. Return {code 0} if the current
+   * message meets the given condition. Return the earliest offset 
{@code -2} if no message meeting the
+   * condition can be found.
+   */
+  public long getCandidateOffset(FetchedMessage message);
--- End diff --

I wanted to mean that the offset returned is the next message to be 
checked, but it is not guaranteed to satisfy the given condition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2016-11-21 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r89042464
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaOffsetProvider.java 
---
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.twill.kafka.client;
+
+/**
+ * Define interface that could provide a method to check whether a message 
meets a given condition. If the condition is
+ * not met, the method will return the next offset to continue searching 
for the message meeting this condition.
+ */
+public interface KafkaOffsetProvider {
--- End diff --

It will be implemented outside of Twill. The objects will be passed to the 
constructor of `MessageCallback`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-03 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94521832
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
   }
-  Futures.getUnchecked(executor.submit(new Runnable() {
+  return Futures.getUnchecked(executor.submit(new Callable() 
{
+long nextOffset = Long.MIN_VALUE;
 @Override
-public void run() {
+public Long call() {
   if (stopped.get()) {
-return;
+return nextOffset;
   }
-  callback.onReceived(messages);
+  nextOffset = callback.onReceived(messages);
--- End diff --

Because in line 286, the stored `nextOffset` should be returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-03 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94522513
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

In this case, no message is processed, so the offset should remain 
unchanged to start reading from the current offset again next time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r9505
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Then `startOffset` is not necessary here? `onReceived` can just keep the 
first message's offset by itself and the caller should guarantee that no empty 
iterator is passed to `onReceived` as in `SimpleKafkaConsumer`. Or should we 
still keep `startOffset` to allow empty iterator to be passed in?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-03 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94522256
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -68,7 +69,7 @@
 /**
  * A {@link KafkaConsumer} implementation using the scala kafka api.
  */
-final class SimpleKafkaConsumer implements KafkaConsumer {
+public final class SimpleKafkaConsumer implements KafkaConsumer {
--- End diff --

I wanted to use the `getLastOffset` method in the test. Haven't figured out 
a way to extract the logic in `getLastOffset` to other places without exposing 
the `SimpleKafkaConsumer` class because `getLastOffset` uses a lot of private 
members in `SimpleKafkaConsumer`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94558457
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -273,17 +274,19 @@ private MessageCallback wrapCallback(final 
MessageCallback callback,
   final AtomicBoolean stopped = new AtomicBoolean();
   return new MessageCallback() {
 @Override
-public void onReceived(final Iterator messages) {
+public long onReceived(final Iterator messages) {
   if (stopped.get()) {
-return;
+return Long.MIN_VALUE;
--- End diff --

Maybe to pass an `AtomicLong` to this method, and it contains the initial 
offset and can be set to the next offset? Just realized this is probably an 
implementation detail in `SimpleKafkaConsumer` and is not general enough to be 
done in the interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94558645
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java
 ---
@@ -68,7 +69,7 @@
 /**
  * A {@link KafkaConsumer} implementation using the scala kafka api.
  */
-final class SimpleKafkaConsumer implements KafkaConsumer {
+public final class SimpleKafkaConsumer implements KafkaConsumer {
--- End diff --

Yes, it's an implementation detail. Just wondering for the future work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Handle offset error and return next offs...

2017-01-04 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94559673
  
--- Diff: 
twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java ---
@@ -170,11 +174,128 @@ public void testKafkaClient() throws Exception {
 Cancellable cancel = kafkaClient.getConsumer().prepare().add(topic, 0, 
0).consume(new KafkaConsumer
   .MessageCallback() {
   @Override
-  public void onReceived(Iterator messages) {
+  public long onReceived(Iterator messages) {
+long nextOffset = Long.MIN_VALUE;
+while (messages.hasNext()) {
+  FetchedMessage message = messages.next();
+  LOG.info(Charsets.UTF_8.decode(message.getPayload()).toString());
+  latch.countDown();
+}
+return nextOffset;
+  }
+
+  @Override
+  public void finished() {
+stopLatch.countDown();
+  }
+});
+
+Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+cancel.cancel();
+Assert.assertTrue(stopLatch.await(1, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testKafkaClientReadFromIdx() throws Exception {
+String topic = "testClient";
+
+// Publish 30 messages with indecies the same as offsets within the 
range 0 - 29
+Thread t1 = createPublishThread(kafkaClient, topic, Compression.GZIP, 
"GZIP Testing message", 10);
+t1.start();
+t1.join();
+Thread t2 = createPublishThread(kafkaClient, topic, Compression.NONE, 
"Testing message", 10, 10);
+t2.start();
+t2.join();
+Thread t3 = createPublishThread(kafkaClient, topic, 
Compression.SNAPPY, "Snappy Testing message", 10, 20);
+t3.start();
+t3.join();
+
+final int startIdx = 15;
+final CountDownLatch latch = new CountDownLatch(30 - startIdx);
+final CountDownLatch stopLatch = new CountDownLatch(1);
+final BlockingQueue offsetQueue = new LinkedBlockingQueue<>();
+// Creater a consumer
+final SimpleKafkaConsumer consumer = (SimpleKafkaConsumer) 
kafkaClient.getConsumer();
+Cancellable initCancel = 
kafkaClient.getConsumer().prepare().add(topic, 0, 0).consume(new KafkaConsumer
+  .MessageCallback() {
+  long minOffset = -2; // earliest msg
+  long maxOffset = -1; // latest msg
+  @Override
+  // Use binary search to find the offset of the message with the 
index matching startIdx. Returns the next offset
--- End diff --

Right, since the offsets and indices of the messages are known, it can 
directly jump to the desired idx without binary search.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94914331
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

Yes, the current implementation follows this at line 454. Is just the 
caller's responsibility to honor this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94915705
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

how can I get the offset of this first message in this case? 
`message.getNextOffset()--`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #16: [TWILL-199] Return the offset to read next message i...

2017-01-06 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/16#discussion_r94913855
  
--- Diff: 
twill-core/src/main/java/org/apache/twill/kafka/client/KafkaConsumer.java ---
@@ -33,14 +33,16 @@
 
 /**
  * Invoked when new messages is available.
+ * @param startOffset Offset of the first {@link FetchedMessage} in 
the iterator of new messages.
--- End diff --

I see. So keep the original description of `startOffset`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill issue #16: [TWILL-199] Return the offset to read next message in Kafka...

2017-01-10 Thread maochf
Github user maochf commented on the issue:

https://github.com/apache/twill/pull/16
  
Thank you! Just squashed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #58: [TWILL-240] EventHandler Improvement

2017-08-03 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/58#discussion_r131234842
  
--- Diff: twill-api/src/main/java/org/apache/twill/api/EventHandler.java ---
@@ -124,6 +124,75 @@ public void initialize(EventHandlerContext context) {
   }
 
   /**
+   * Invoked by the application when it starts.
+   *
+   * @param twillAppName name of the current application
+   * @param runId run ID of current application run
+   */
+  public void started(String twillAppName, RunId runId) {
--- End diff --

the `EventHandlerContext` just contains a `EventHandlerSpecification`, and 
it doesn't seem to guarantee app name and runid are there?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #58: [TWILL-240] EventHandler Improvement

2017-07-26 Thread maochf
GitHub user maochf opened a pull request:

https://github.com/apache/twill/pull/58

[TWILL-240] EventHandler Improvement

https://issues.apache.org/jira/browse/TWILL-240

Add started, containerLaunched, containerStopped, completed, killed, 
aborted to EventHandler

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maochf/twill feature/event-handler-improvement

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/twill/pull/58.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #58


commit 76eeca04586712123c44caf5788c17e0968a
Author: Chengfeng <m...@cask.co>
Date:   2017-07-25T19:53:26Z

[TWILL-240] Add started, containerLaunched, containerStopped, completed, 
killed, aborted to EventHandler




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill pull request #58: [TWILL-240] EventHandler Improvement

2017-08-09 Thread maochf
Github user maochf commented on a diff in the pull request:

https://github.com/apache/twill/pull/58#discussion_r132298585
  
--- Diff: 
twill-yarn/src/main/java/org/apache/twill/internal/appmaster/ApplicationMasterService.java
 ---
@@ -193,7 +200,91 @@ private EventHandler 
createEventHandler(TwillSpecification twillSpec) throws Cla
 
Preconditions.checkArgument(EventHandler.class.isAssignableFrom(handlerClass),
 "Class {} does not implements {}",
 handlerClass, 
EventHandler.class.getName());
-return Instances.newInstance((Class) 
handlerClass);
+final EventHandler delegate = Instances.newInstance((Class) handlerClass);
+if (delegate == null) {
+  // if no handler is specified, return an EventHandler with no-op
+  return new EventHandler() {};
+}
+// wrap the delegate EventHandler so that all errors will be caught
+return new EventHandler() {
--- End diff --

originally, if `EventHandler#initialize` fails, the app will fail to start. 
Do we want to change this behavior?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] twill issue #58: [TWILL-240] EventHandler Improvement

2017-08-09 Thread maochf
Github user maochf commented on the issue:

https://github.com/apache/twill/pull/58
  
squashed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---