[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258302217
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
 ##
 @@ -208,7 +223,11 @@ class SystemConsumers (
   if (startpoint != null) {
 consumer.register(systemStreamPartition, startpoint)
   } else {
-consumer.register(systemStreamPartition, offset)
+val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
+val systemAdmin = 
systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+if (existingOffset == null || 
systemAdmin.offsetComparator(existingOffset, offset) > 0) {
+  sspToRegisteredOffsets.put(systemStreamPartition, offset)
 
 Review comment:
   It seems conceptually a bit odd to call `SystemConsumer.register` in 
`SystemConsumers.start`. If `SystemConsumer.register` works with multiple calls 
(so it just registers the offset from the latest call to `register`), then you 
could call `register` here, and then you wouldn't need to call it in `start`.
   Not sure if this is feasible, but I think it would make the flow a bit 
cleaner, since you wouldn't have to worry about doing anything extra in `start`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258284619
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -258,40 +260,28 @@ public void stop() {
*/
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
+register(systemStreamPartition, new StartpointSpecific(offset));
+  }
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
 if (started.get()) {
-  String msg = String.format("%s: Trying to register partition after 
consumer has been started. ssp=%s", this,
-  systemStreamPartition);
-  throw new SamzaException(msg);
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
 
 Review comment:
   Minor: Is the class name at the beginning of the message necessary? I think 
log4j will already give you that, and any classes which extend this class will 
have a slightly misleading message.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258283701
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -159,30 +179,16 @@ private void startSubscription() {
*/
   void startConsumer() {
 // set the offset for each TopicPartition
-if (topicPartitionsToOffset.size() <= 0) {
+if (topicPartitionToStartpointMap.size() <= 0) {
   LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
 }
 
-topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
-  long startingOffset = Long.valueOf(startingOffsetString);
-
-  try {
-synchronized (kafkaConsumer) {
-  kafkaConsumer.seek(tp, startingOffset); // this value should already 
be the 'upcoming' value
-}
-  } catch (Exception e) {
-// all recoverable execptions are handled by the client.
-// if we get here there is nothing left to do but bail out.
-String msg =
-String.format("%s: Got Exception while seeking to %s for partition 
%s", this, startingOffsetString, tp);
-LOG.error(msg, e);
-throw new SamzaException(msg, e);
-  }
-
-  LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", 
this, tp, startingOffsetString);
-
+topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> {
+  Partition partition = new Partition(topicPartition.partition());
+  SystemStreamPartition systemStreamPartition = new 
SystemStreamPartition(systemName, topicPartition.topic(), partition);
+  startpoint.apply(systemStreamPartition, 
kafkaStartpointRegistrationHandler);
   // add the partition to the proxy
-  proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
+  proxy.addTopicPartition(systemStreamPartition, 
kafkaConsumer.position(topicPartition));
 
 Review comment:
   Looks like the previous comment about this got marked as "outdated".
   Can this be moved into the registration handler? Seems like this 
functionality would fall into the responsibility of that object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258281900
 
 

 ##
 File path: samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
 ##
 @@ -38,6 +38,10 @@ public SystemAdmins(Config config) {
 this.systemAdminMap = systemConfig.getSystemAdmins();
   }
 
+  public SystemAdmins(Map systemAdminMap) {
 
 Review comment:
   Is this just for tests? Is it possible to avoid doing this? Could you make a 
mock `SystemAdmins` instead? Maybe just always have it return a mock 
`SystemAdmin`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258295838
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -60,16 +74,22 @@
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
   private final KafkaSystemConsumerMetrics metrics;
+  private final KafkaStartpointRegistrationHandler 
kafkaStartpointRegistrationHandler;
 
   // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with 
consumer.poll()) and populates
   // BlockingEnvelopMap's buffers.
-  final private KafkaConsumerProxy proxy;
+  @VisibleForTesting
+  KafkaConsumerProxy proxy;
 
   // keep registration data until the start - mapping between registered SSPs 
and topicPartitions, and their offsets
   final Map topicPartitionsToOffset = new HashMap<>();
 
 Review comment:
   2A. I agree that it is odd for specific `SystemConsumer` implementations to 
each compare offsets. It seems outside of the scope of this PR though. Can we 
move that comparator logic refactoring to a separate PR so that we can cleanly 
separate the changes? That PR could be committed before this PR.
   2B. I looked at some other code which uses `register` (e.g. 
`ContainerStorageManager`, `CoordinatorStreamStore`, and 
`CoordinatorStreamSystemConsumer`). Those don't really seem to do any 
comparisons to make sure they are registering the lowest offset. How can you 
tell all of the upper-layers already pass the lowest offset? I just want to 
double check that there wasn't a bug in an upper-layer which happened to be 
prevented by having offset comparator logic directly in `SystemConsumer` impls.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258286825
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
+
+TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+topicPartitionToStartpointMap.put(topicPartition, startpoint);
 
 Review comment:
   I was thinking of the case where `existingOffset` corresponds to the current 
checkpoint, and someone tries to set the offset to something after the current 
checkpoint. It looks like this code prevents someone from explicitly setting an 
offset to something after the `existingOffset`.
   Now that you added the offset check to `SystemConsumers`, it looks like this 
removed functionality is now handled there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257073732
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
 
 Review comment:
   Oops, I lost track of your change to `BlockingEnvelopeMap` above. Ignore the 
above comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257063620
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final String offset = "0";
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(offset);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(offset));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset));
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+final Long testTimeStamp = 10L;
+final String testOffset = "10";
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(testTopicPartition, new 
OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+  }
+
+  @Test
+  public void 
testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+// Mock the consumer interactions.
+
Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+
Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+  }
+
+  @Test
+  public void 
testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaSystemConsumer.KafkaStartpointRegistrationHandler
+ 

[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257061763
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
+
+TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+topicPartitionToStartpointMap.put(topicPartition, startpoint);
 
 Review comment:
   There is logic in the old `register` which checks that the requested offset 
is older than the existing offset, in order to avoid missing messages. Just 
double checking that we want to intentionally leave that out?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257062873
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
 
 Review comment:
   Does `BlockingEnvelopeMap` implement this `register`? Seems like this will 
throw an exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257058746
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -141,11 +166,14 @@ public void start() {
 
   private void startSubscription() {
 //subscribe to all the registered TopicPartitions
-LOG.info("{}: Consumer subscribes to {}", this, 
topicPartitionsToSSP.keySet());
+Set registeredTopicPartitions = new HashSet<>();
+registeredTopicPartitions.addAll(topicPartitionsToSSP.keySet());
+registeredTopicPartitions.addAll(topicPartitionToStartpointMap.keySet());
 
 Review comment:
   What is the case where this has a `TopicPartition`, but 
`topicPartitionsToSSP` does not have it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257061221
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +387,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
 
 Review comment:
   `kafkaConsumer.seek` is wrapped in a try-catch above. Should that be done 
here as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257063162
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
 
 Review comment:
   Maybe extract some of these dummy variables to static variables in the test 
class, so they can be shared with your other visitor tests?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257062107
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
 
 Review comment:
   Do you plan to remove the other `register` method once 1.0.1 is released, or 
do you plan to support the other `register` for more time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257063321
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final String offset = "0";
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(offset);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(offset));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset));
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+final Long testTimeStamp = 10L;
+final String testOffset = "10";
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(testTopicPartition, new 
OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+  }
+
+  @Test
+  public void 
testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+// Mock the consumer interactions.
+
Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+
Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+  }
+
+  @Test
+  public void 
testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaSystemConsumer.KafkaStartpointRegistrationHandler
+ 

[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257058381
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -60,16 +74,22 @@
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
   private final KafkaSystemConsumerMetrics metrics;
+  private final KafkaStartpointRegistrationHandler 
kafkaStartpointRegistrationHandler;
 
   // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with 
consumer.poll()) and populates
   // BlockingEnvelopMap's buffers.
-  final private KafkaConsumerProxy proxy;
+  @VisibleForTesting
+  KafkaConsumerProxy proxy;
 
   // keep registration data until the start - mapping between registered SSPs 
and topicPartitions, and their offsets
   final Map topicPartitionsToOffset = new HashMap<>();
 
 Review comment:
   Can we get rid of this and use `topicPartitionToStartpointMap` instead? If 
the plan was to do that later after removing the offset-based registration, 
then maybe this PR can just wait until after we have done that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-14 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257060720
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -163,26 +191,35 @@ void startConsumer() {
   LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
 }
 
-topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
-  long startingOffset = Long.valueOf(startingOffsetString);
+// If both the startpoint and checkpointed offsets are present for a topic 
partition,
+// then precedence is given to the startpoint.
+topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> {
+  Partition partition = new Partition(topicPartition.partition());
+  SystemStreamPartition systemStreamPartition = new 
SystemStreamPartition(systemName, topicPartition.topic(), partition);
+  startpoint.apply(systemStreamPartition, 
kafkaStartpointRegistrationHandler);
+  // add the partition to the proxy
+  proxy.addTopicPartition(topicPartitionsToSSP.get(topicPartition), 
kafkaConsumer.position(topicPartition));
 
 Review comment:
   Can this be moved into the registration handler? Seems like this 
functionality would fall into the responsibility of that object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services