[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r261470344 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +320,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { + Long timestampInStartpoint = startpointTimestamp.getTimestampOffset(); + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + Map topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint); + + // Look up the offset by timestamp. + LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint); + Map topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps); Review comment: Fixed. Handled the null-case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r261470344 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +320,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { + Long timestampInStartpoint = startpointTimestamp.getTimestampOffset(); + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + Map topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint); + + // Look up the offset by timestamp. + LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint); + Map topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps); Review comment: Will handle the null-case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r261468553 ## File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ## @@ -208,7 +223,11 @@ class SystemConsumers ( if (startpoint != null) { consumer.register(systemStreamPartition, startpoint) } else { -consumer.register(systemStreamPartition, offset) +val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition) +val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem) +if (existingOffset == null || systemAdmin.offsetComparator(existingOffset, offset) > 0) { + sspToRegisteredOffsets.put(systemStreamPartition, offset) Review comment: 1. This would happen in the broadcast stream scenario. This older-offset-comparator check is done by some of the SystemConsumer implementation and not in the others. I'm moving it to a common layer like `SystemConsumers` above to ensure functional correctness. More context [here](https://github.com/apache/samza/pull/918#discussion_r257523873). 2. Currently this behavior is what all the system consumer implementations that support broadcast streams do. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r261470344 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +320,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { + Long timestampInStartpoint = startpointTimestamp.getTimestampOffset(); + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + Map topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint); + + // Look up the offset by timestamp. + LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint); + Map topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps); Review comment: Good question. Excerpt from the kafka javadoc linked in the above comment: ``` If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null will be returned for that partition ``` Null would be returned by KafkaConsumer `offsetsForTimes` API when used with the kafka broker version < `0.10.0`. Starting from 1.0, samza supports kafka-clients version: `0.11.0.2`. From the [kafka-compatibility-matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix), kafka-client version used by samza is not compatible with broker versions < `0.10.0`. Since samza doesn't support kafka brokers < 0.10.0, I'm not sure if we should handle null value returned by this API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r261470344 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +320,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { + Long timestampInStartpoint = startpointTimestamp.getTimestampOffset(); + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + Map topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint); + + // Look up the offset by timestamp. + LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint); + Map topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps); Review comment: Good question. Excerpt the kafka javadoc linked in the above comment: ``` If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null will be returned for that partition ``` Null would be returned by KafkaConsumer `offsetsForTimes` API when used with the kafka broker version < `0.10.0`. Starting from 1.0, samza supports kafka-clients version: `0.11.0.2`. From the [kafka-compatibility-matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix), kafka-client version used by samza is not compatible with broker versions < `0.10.0`. Since samza doesn't support kafka brokers < 0.10.0, I'm not sure if we should handle null value returned by this API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r261470344 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +320,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { + Long timestampInStartpoint = startpointTimestamp.getTimestampOffset(); + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + Map topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint); + + // Look up the offset by timestamp. + LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint); + Map topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps); Review comment: Good question. Excerpt the kafka javadoc linked in the above comment: ``` If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null will be returned for that partition ``` Null would be returned by KafkaConsumer `offsetsForTimes` API when used with the kafka broker version < `0.10.0`. Starting from 1.0, samza supports kafka version: `0.11.0.2`. From the [kafka-compatibility-matrix](https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix), kafka-client version used by samza is not compatible with broker versions < `0.10.0`. Since samza doesn't support kafka brokers < 0.10.0, I'm not sure if we should handle null value returned by this API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r261468553 ## File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ## @@ -208,7 +223,11 @@ class SystemConsumers ( if (startpoint != null) { consumer.register(systemStreamPartition, startpoint) } else { -consumer.register(systemStreamPartition, offset) +val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition) +val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem) +if (existingOffset == null || systemAdmin.offsetComparator(existingOffset, offset) > 0) { + sspToRegisteredOffsets.put(systemStreamPartition, offset) Review comment: 1. This would happen in the broadcast stream scenario. This older-offset-comparator check is done by some of the SystemConsumer implementation and not in the others. I'm moving it to a common layer like `SystemConsumers` above to ensure functional correctness. More context [here](https://github.com/apache/samza/pull/918#discussion_r257523873). 2. Currently this odd behavior is what all the system consumer implementations that support broadcast streams do. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258333602 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -258,40 +260,28 @@ public void stop() { */ @Override public void register(SystemStreamPartition systemStreamPartition, String offset) { +register(systemStreamPartition, new StartpointSpecific(offset)); + } + + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { if (started.get()) { - String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this, - systemStreamPartition); - throw new SamzaException(msg); + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); Review comment: This was not introduced as a part of this patch. However, I agree that this is unnecessary. Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258333559 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -159,30 +179,16 @@ private void startSubscription() { */ void startConsumer() { // set the offset for each TopicPartition -if (topicPartitionsToOffset.size() <= 0) { +if (topicPartitionToStartpointMap.size() <= 0) { LOG.error ("{}: Consumer is not subscribed to any SSPs", this); } -topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { - long startingOffset = Long.valueOf(startingOffsetString); - - try { -synchronized (kafkaConsumer) { - kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value -} - } catch (Exception e) { -// all recoverable execptions are handled by the client. -// if we get here there is nothing left to do but bail out. -String msg = -String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp); -LOG.error(msg, e); -throw new SamzaException(msg, e); - } - - LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString); - +topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> { + Partition partition = new Partition(topicPartition.partition()); + SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, topicPartition.topic(), partition); + startpoint.apply(systemStreamPartition, kafkaStartpointRegistrationHandler); // add the partition to the proxy - proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset); + proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition)); Review comment: > Looks like the previous comment about this got marked as "outdated". Yes, that's the reason why i missed it as well. Agree with your suggestion. Moved. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523191 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { Review comment: Yes. New introduced API: `register(ssp, startpoint)` has guarantees that is a superset of the existing `register(ssp, offset)` API in SystemConsumer. After 1.0.1, we would remove the existing register(ssp, offset) API eventually. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257522508 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +387,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); Review comment: 1. Upper layers which call this API already log any exception that is thrown back. 2. Wrapping the exception to a SamzaException and throwing it back, IMHO doesn't add much value. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523191 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { Review comment: Yes. New introduced API: `register(ssp, startpoint)` has guarantees that is a superset of the existing `register(ssp, offset)` API in SystemConsumer. After 1.0.1, we would remove the existing register(ssp, offset) method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523873 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -60,16 +74,22 @@ private final Config config; private final boolean fetchThresholdBytesEnabled; private final KafkaSystemConsumerMetrics metrics; + private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler; // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. final KafkaConsumerMessageSink messageSink; // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates // BlockingEnvelopMap's buffers. - final private KafkaConsumerProxy proxy; + @VisibleForTesting + KafkaConsumerProxy proxy; // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets final Map topicPartitionsToOffset = new HashMap<>(); Review comment: I'm aware that we'd to do this change. My initial plan was to do it after migrating all the system consumer to implement at-least the startpoint-specific visitor. Discussed offline and we agreed to do this change in this patch itself. 1. Removed `topicPartitionstoOffset` and delegate `register(ssp, offset)` API calls to start-point registration API. 2. Moved the offset comparator logic to `SystemConsumers` layer. A. Only few implementations of SystemConsumer.register API currently use systemAdmin.offsetComparator to compare offsets. However, the comparator logic is common to all implementations of SystemConsumer.register API. Moving it to the upper-layer(`SystemConsumers`) would ensure functional correctness. B. All other upper-layers except `SystemConsumers` that invoke register(ssp, offset) API already pass the lowest offset for a SSP. We don't have startpoint comparator abstraction yet. To delegate calls from register(ssp, offset) to register-startpoint API we had to get the lowest offset for a SSP from SystemConsumers layer. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257524023 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -141,11 +166,14 @@ public void start() { private void startSubscription() { //subscribe to all the registered TopicPartitions -LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet()); +Set registeredTopicPartitions = new HashSet<>(); +registeredTopicPartitions.addAll(topicPartitionsToSSP.keySet()); +registeredTopicPartitions.addAll(topicPartitionToStartpointMap.keySet()); Review comment: This is no longer necessary, after we'd agreed offline to make the register(ssp, offset) API to delegate to startpoint-register API. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257074119 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); + +TopicPartition topicPartition = toTopicPartition(systemStreamPartition); +topicPartitionToStartpointMap.put(topicPartition, startpoint); Review comment: I discussed this with you offline before about having a startpoint comparator to choose the lowest startpoint when there're multiple startpoints defined for a SSP. Example: Broadcast streams. Task1 defines lower startpoint and Task-2 defines a higher startpoint for the SSP. It's already tracked here: SAMZA-2088. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523065 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257523115 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final String offset = "0"; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(offset); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(offset)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset)); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); +final Long testTimeStamp = 10L; +final String testOffset = "10"; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of(testTopicPartition, new OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); + +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + } + + @Test + public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointOldest testStartpointSpecific = new StartpointOldest(); + +// Mock the consumer interactions. + Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. + Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + } + + @Test + public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaSystemConsumer.KafkaStartpointRegistrationHandler +
[GitHub] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257072998 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); Review comment: It already does and is already added as a part of this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services