[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-01 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r261470344
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +320,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointTimestamp startpointTimestamp) {
+  Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  Map topicPartitionsToTimeStamps = 
ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+  // Look up the offset by timestamp.
+  LOG.info("Looking up the offsets of the topic partition: {} by 
timestamp: {}.", topicPartition, timestampInStartpoint);
+  Map topicPartitionToOffsetTimestamps 
= kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
 
 Review comment:
   Fixed. Handled the null-case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-01 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r261470344
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +320,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointTimestamp startpointTimestamp) {
+  Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  Map topicPartitionsToTimeStamps = 
ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+  // Look up the offset by timestamp.
+  LOG.info("Looking up the offsets of the topic partition: {} by 
timestamp: {}.", topicPartition, timestampInStartpoint);
+  Map topicPartitionToOffsetTimestamps 
= kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
 
 Review comment:
   Will handle the null-case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-28 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r261468553
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
 ##
 @@ -208,7 +223,11 @@ class SystemConsumers (
   if (startpoint != null) {
 consumer.register(systemStreamPartition, startpoint)
   } else {
-consumer.register(systemStreamPartition, offset)
+val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
+val systemAdmin = 
systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+if (existingOffset == null || 
systemAdmin.offsetComparator(existingOffset, offset) > 0) {
+  sspToRegisteredOffsets.put(systemStreamPartition, offset)
 
 Review comment:
   1. This would happen in the broadcast stream scenario. This 
older-offset-comparator check is done by some of the SystemConsumer 
implementation and not in the others. I'm moving it to a common layer like 
`SystemConsumers` above to ensure functional correctness. More context 
[here](https://github.com/apache/samza/pull/918#discussion_r257523873).
   2. Currently this behavior is what all the system consumer implementations 
that support broadcast streams do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-28 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r261470344
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +320,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointTimestamp startpointTimestamp) {
+  Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  Map topicPartitionsToTimeStamps = 
ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+  // Look up the offset by timestamp.
+  LOG.info("Looking up the offsets of the topic partition: {} by 
timestamp: {}.", topicPartition, timestampInStartpoint);
+  Map topicPartitionToOffsetTimestamps 
= kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
 
 Review comment:
   Good question. 
   Excerpt from the kafka javadoc linked in the above comment:
   ```
   If the message format version in a partition is before 0.10.0, i.e. the 
messages do not have timestamps, null will be returned for that partition
   ```
   
   Null would be returned by  KafkaConsumer `offsetsForTimes` API when used 
with the kafka broker version < `0.10.0`. 
   
   Starting from 1.0, samza supports kafka-clients version: `0.11.0.2`. From 
the 
[kafka-compatibility-matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix),
 kafka-client version used by samza is not compatible with broker versions < 
`0.10.0`. 
   
   Since samza doesn't support kafka brokers < 0.10.0, I'm not sure if we 
should handle null value returned by this API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-28 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r261470344
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +320,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointTimestamp startpointTimestamp) {
+  Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  Map topicPartitionsToTimeStamps = 
ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+  // Look up the offset by timestamp.
+  LOG.info("Looking up the offsets of the topic partition: {} by 
timestamp: {}.", topicPartition, timestampInStartpoint);
+  Map topicPartitionToOffsetTimestamps 
= kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
 
 Review comment:
   Good question. 
   Excerpt the kafka javadoc linked in the above comment:
   ```
   If the message format version in a partition is before 0.10.0, i.e. the 
messages do not have timestamps, null will be returned for that partition
   ```
   
   Null would be returned by  KafkaConsumer `offsetsForTimes` API when used 
with the kafka broker version < `0.10.0`. 
   
   Starting from 1.0, samza supports kafka-clients version: `0.11.0.2`. From 
the 
[kafka-compatibility-matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix),
 kafka-client version used by samza is not compatible with broker versions < 
`0.10.0`. 
   
   Since samza doesn't support kafka brokers < 0.10.0, I'm not sure if we 
should handle null value returned by this API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-28 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r261470344
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +320,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointTimestamp startpointTimestamp) {
+  Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  Map topicPartitionsToTimeStamps = 
ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+  // Look up the offset by timestamp.
+  LOG.info("Looking up the offsets of the topic partition: {} by 
timestamp: {}.", topicPartition, timestampInStartpoint);
+  Map topicPartitionToOffsetTimestamps 
= kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
 
 Review comment:
   Good question. 
   Excerpt the kafka javadoc linked in the above comment:
   ```
   If the message format version in a partition is before 0.10.0, i.e. the 
messages do not have timestamps, null will be returned for that partition
   ```
   
   Null would be returned by  KafkaConsumer `offsetsForTimes` API when used 
with the kafka broker version < `0.10.0`. 
   
   Starting from 1.0, samza supports kafka version: `0.11.0.2`. From the 
[kafka-compatibility-matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix),
 kafka-client version used by samza is not compatible with broker versions < 
`0.10.0`. 
   
   Since samza doesn't support kafka brokers < 0.10.0, I'm not sure if we 
should handle null value returned by this API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-28 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r261468553
 
 

 ##
 File path: 
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
 ##
 @@ -208,7 +223,11 @@ class SystemConsumers (
   if (startpoint != null) {
 consumer.register(systemStreamPartition, startpoint)
   } else {
-consumer.register(systemStreamPartition, offset)
+val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition)
+val systemAdmin = 
systemAdmins.getSystemAdmin(systemStreamPartition.getSystem)
+if (existingOffset == null || 
systemAdmin.offsetComparator(existingOffset, offset) > 0) {
+  sspToRegisteredOffsets.put(systemStreamPartition, offset)
 
 Review comment:
   1. This would happen in the broadcast stream scenario. This 
older-offset-comparator check is done by some of the SystemConsumer 
implementation and not in the others. I'm moving it to a common layer like 
`SystemConsumers` above to ensure functional correctness. More context 
[here](https://github.com/apache/samza/pull/918#discussion_r257523873).
   2. Currently this odd behavior is what all the system consumer 
implementations that support broadcast streams do.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-28 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258333602
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -258,40 +260,28 @@ public void stop() {
*/
   @Override
   public void register(SystemStreamPartition systemStreamPartition, String 
offset) {
+register(systemStreamPartition, new StartpointSpecific(offset));
+  }
+
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
 if (started.get()) {
-  String msg = String.format("%s: Trying to register partition after 
consumer has been started. ssp=%s", this,
-  systemStreamPartition);
-  throw new SamzaException(msg);
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
 
 Review comment:
   This was not introduced as a part of this patch. However, I agree that this 
is unnecessary. Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-28 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r258333559
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -159,30 +179,16 @@ private void startSubscription() {
*/
   void startConsumer() {
 // set the offset for each TopicPartition
-if (topicPartitionsToOffset.size() <= 0) {
+if (topicPartitionToStartpointMap.size() <= 0) {
   LOG.error ("{}: Consumer is not subscribed to any SSPs", this);
 }
 
-topicPartitionsToOffset.forEach((tp, startingOffsetString) -> {
-  long startingOffset = Long.valueOf(startingOffsetString);
-
-  try {
-synchronized (kafkaConsumer) {
-  kafkaConsumer.seek(tp, startingOffset); // this value should already 
be the 'upcoming' value
-}
-  } catch (Exception e) {
-// all recoverable execptions are handled by the client.
-// if we get here there is nothing left to do but bail out.
-String msg =
-String.format("%s: Got Exception while seeking to %s for partition 
%s", this, startingOffsetString, tp);
-LOG.error(msg, e);
-throw new SamzaException(msg, e);
-  }
-
-  LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", 
this, tp, startingOffsetString);
-
+topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> {
+  Partition partition = new Partition(topicPartition.partition());
+  SystemStreamPartition systemStreamPartition = new 
SystemStreamPartition(systemName, topicPartition.topic(), partition);
+  startpoint.apply(systemStreamPartition, 
kafkaStartpointRegistrationHandler);
   // add the partition to the proxy
-  proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset);
+  proxy.addTopicPartition(systemStreamPartition, 
kafkaConsumer.position(topicPartition));
 
 Review comment:
   > Looks like the previous comment about this got marked as "outdated".
   
   Yes, that's the reason why i missed it as well. 
   
   Agree with your suggestion. Moved.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523191
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
 
 Review comment:
   Yes. New introduced API: `register(ssp, startpoint)`  has guarantees that is 
a superset of the existing `register(ssp, offset)` API  in SystemConsumer.
   After 1.0.1, we would  remove the existing register(ssp, offset) API 
eventually. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257522508
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +387,73 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  static class KafkaStartpointRegistrationHandler implements StartpointVisitor 
{
+
+private final Consumer kafkaConsumer;
+
+KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) {
+  this.kafkaConsumer = kafkaConsumer;
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
 
 Review comment:
   1. Upper layers which call this API already log any exception that is thrown 
back.
   2. Wrapping the exception to a SamzaException and throwing it back, IMHO 
doesn't add much value.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523191
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
 
 Review comment:
   Yes. New introduced API: `register(ssp, startpoint)`  has guarantees that is 
a superset of the existing `register(ssp, offset)` API  in SystemConsumer.
   After 1.0.1, we would  remove the existing register(ssp, offset) method. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523873
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -60,16 +74,22 @@
   private final Config config;
   private final boolean fetchThresholdBytesEnabled;
   private final KafkaSystemConsumerMetrics metrics;
+  private final KafkaStartpointRegistrationHandler 
kafkaStartpointRegistrationHandler;
 
   // This sink is used to transfer the messages from the proxy/consumer to the 
BlockingEnvelopeMap.
   final KafkaConsumerMessageSink messageSink;
 
   // This proxy contains a separate thread, which reads kafka messages (with 
consumer.poll()) and populates
   // BlockingEnvelopMap's buffers.
-  final private KafkaConsumerProxy proxy;
+  @VisibleForTesting
+  KafkaConsumerProxy proxy;
 
   // keep registration data until the start - mapping between registered SSPs 
and topicPartitions, and their offsets
   final Map topicPartitionsToOffset = new HashMap<>();
 
 Review comment:
   I'm aware that we'd to do this change. My initial plan was to do it after 
migrating all the system consumer to implement at-least the startpoint-specific 
visitor. 
   
   Discussed offline and we agreed to do this change in this patch itself. 
   1. Removed  `topicPartitionstoOffset`  and delegate `register(ssp, offset)` 
API calls to  start-point registration API.
   2. Moved the offset comparator logic to `SystemConsumers` layer. 
 A. Only few implementations of  SystemConsumer.register API 
currently use systemAdmin.offsetComparator to compare offsets. However, the 
comparator logic is common to all implementations of SystemConsumer.register 
API. Moving it to the upper-layer(`SystemConsumers`) would ensure functional 
correctness.
 B. All other upper-layers except `SystemConsumers` that invoke 
register(ssp, offset) API already pass the lowest offset for a SSP.  We don't 
have startpoint comparator abstraction yet. To delegate calls from 
register(ssp, offset) to register-startpoint API we had to get the lowest 
offset for a SSP from SystemConsumers layer.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257524023
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -141,11 +166,14 @@ public void start() {
 
   private void startSubscription() {
 //subscribe to all the registered TopicPartitions
-LOG.info("{}: Consumer subscribes to {}", this, 
topicPartitionsToSSP.keySet());
+Set registeredTopicPartitions = new HashSet<>();
+registeredTopicPartitions.addAll(topicPartitionsToSSP.keySet());
+registeredTopicPartitions.addAll(topicPartitionToStartpointMap.keySet());
 
 Review comment:
   This is no longer necessary, after we'd agreed offline to make the 
register(ssp, offset) API to delegate to startpoint-register API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257074119
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
+
+TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+topicPartitionToStartpointMap.put(topicPartition, startpoint);
 
 Review comment:
   I discussed this with you offline before about having a startpoint 
comparator to choose the lowest startpoint when there're multiple startpoints  
defined for a  SSP. 
   
   Example: Broadcast streams. Task1 defines lower startpoint and Task-2 
defines a higher startpoint for the SSP.
   
   It's already tracked here: SAMZA-2088.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523065
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
 
 Review comment:
   Done. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257523115
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final String offset = "0";
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(offset);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(offset));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset));
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+final Long testTimeStamp = 10L;
+final String testOffset = "10";
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(testTopicPartition, new 
OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(testTopicPartition, 
Long.valueOf(testOffset));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+  }
+
+  @Test
+  public void 
testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaStartpointRegistrationHandler
+kafkaStartpointRegistrationHandler = new 
KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer);
+
+final StartpointOldest testStartpointSpecific = new StartpointOldest();
+
+// Mock the consumer interactions.
+
Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, 
testStartpointSpecific);
+
+// Mock verifications.
+
Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition));
+  }
+
+  @Test
+  public void 
testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Integer testPartitionId = 0;
+final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, 
testPartitionId);
+final Partition testPartition = new Partition(testPartitionId);
+final SystemStreamPartition testSystemStreamPartition = new 
SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition);
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaSystemConsumer.KafkaStartpointRegistrationHandler
+

[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-02-17 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r257072998
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -285,6 +321,27 @@ public void register(SystemStreamPartition 
systemStreamPartition, String offset)
 metrics.registerTopicAndPartition(toTopicAndPartition(tp));
   }
 
+  @Override
+  public void register(SystemStreamPartition systemStreamPartition, Startpoint 
startpoint) {
+if (started.get()) {
+  String exceptionMessage = String.format("KafkaSystemConsumer: %s had 
started. Registration of ssp: %s, startpoint: %s failed.", this, 
systemStreamPartition, startpoint);
+  throw new SamzaException(exceptionMessage);
+}
+
+if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) {
+  LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't 
match.", this, systemStreamPartition);
+  return;
+}
+
+LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", 
systemStreamPartition, startpoint);
+
+super.register(systemStreamPartition, startpoint);
 
 Review comment:
   It already does and is already added as a part of this PR. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services