[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258302217 ## File path: samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ## @@ -208,7 +223,11 @@ class SystemConsumers ( if (startpoint != null) { consumer.register(systemStreamPartition, startpoint) } else { -consumer.register(systemStreamPartition, offset) +val existingOffset = sspToRegisteredOffsets.get(systemStreamPartition) +val systemAdmin = systemAdmins.getSystemAdmin(systemStreamPartition.getSystem) +if (existingOffset == null || systemAdmin.offsetComparator(existingOffset, offset) > 0) { + sspToRegisteredOffsets.put(systemStreamPartition, offset) Review comment: It seems conceptually a bit odd to call `SystemConsumer.register` in `SystemConsumers.start`. If `SystemConsumer.register` works with multiple calls (so it just registers the offset from the latest call to `register`), then you could call `register` here, and then you wouldn't need to call it in `start`. Not sure if this is feasible, but I think it would make the flow a bit cleaner, since you wouldn't have to worry about doing anything extra in `start`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258284619 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -258,40 +260,28 @@ public void stop() { */ @Override public void register(SystemStreamPartition systemStreamPartition, String offset) { +register(systemStreamPartition, new StartpointSpecific(offset)); + } + + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { if (started.get()) { - String msg = String.format("%s: Trying to register partition after consumer has been started. ssp=%s", this, - systemStreamPartition); - throw new SamzaException(msg); + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); Review comment: Minor: Is the class name at the beginning of the message necessary? I think log4j will already give you that, and any classes which extend this class will have a slightly misleading message. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258283701 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -159,30 +179,16 @@ private void startSubscription() { */ void startConsumer() { // set the offset for each TopicPartition -if (topicPartitionsToOffset.size() <= 0) { +if (topicPartitionToStartpointMap.size() <= 0) { LOG.error ("{}: Consumer is not subscribed to any SSPs", this); } -topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { - long startingOffset = Long.valueOf(startingOffsetString); - - try { -synchronized (kafkaConsumer) { - kafkaConsumer.seek(tp, startingOffset); // this value should already be the 'upcoming' value -} - } catch (Exception e) { -// all recoverable execptions are handled by the client. -// if we get here there is nothing left to do but bail out. -String msg = -String.format("%s: Got Exception while seeking to %s for partition %s", this, startingOffsetString, tp); -LOG.error(msg, e); -throw new SamzaException(msg, e); - } - - LOG.info("{}: Changing consumer's starting offset for tp = {} to {}", this, tp, startingOffsetString); - +topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> { + Partition partition = new Partition(topicPartition.partition()); + SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, topicPartition.topic(), partition); + startpoint.apply(systemStreamPartition, kafkaStartpointRegistrationHandler); // add the partition to the proxy - proxy.addTopicPartition(topicPartitionsToSSP.get(tp), startingOffset); + proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition)); Review comment: Looks like the previous comment about this got marked as "outdated". Can this be moved into the registration handler? Seems like this functionality would fall into the responsibility of that object. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258281900 ## File path: samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java ## @@ -38,6 +38,10 @@ public SystemAdmins(Config config) { this.systemAdminMap = systemConfig.getSystemAdmins(); } + public SystemAdmins(Map systemAdminMap) { Review comment: Is this just for tests? Is it possible to avoid doing this? Could you make a mock `SystemAdmins` instead? Maybe just always have it return a mock `SystemAdmin`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258295838 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -60,16 +74,22 @@ private final Config config; private final boolean fetchThresholdBytesEnabled; private final KafkaSystemConsumerMetrics metrics; + private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler; // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. final KafkaConsumerMessageSink messageSink; // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates // BlockingEnvelopMap's buffers. - final private KafkaConsumerProxy proxy; + @VisibleForTesting + KafkaConsumerProxy proxy; // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets final Map topicPartitionsToOffset = new HashMap<>(); Review comment: 2A. I agree that it is odd for specific `SystemConsumer` implementations to each compare offsets. It seems outside of the scope of this PR though. Can we move that comparator logic refactoring to a separate PR so that we can cleanly separate the changes? That PR could be committed before this PR. 2B. I looked at some other code which uses `register` (e.g. `ContainerStorageManager`, `CoordinatorStreamStore`, and `CoordinatorStreamSystemConsumer`). Those don't really seem to do any comparisons to make sure they are registering the lowest offset. How can you tell all of the upper-layers already pass the lowest offset? I just want to double check that there wasn't a bug in an upper-layer which happened to be prevented by having offset comparator logic directly in `SystemConsumer` impls. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r258286825 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); + +TopicPartition topicPartition = toTopicPartition(systemStreamPartition); +topicPartitionToStartpointMap.put(topicPartition, startpoint); Review comment: I was thinking of the case where `existingOffset` corresponds to the current checkpoint, and someone tries to set the offset to something after the current checkpoint. It looks like this code prevents someone from explicitly setting an offset to something after the `existingOffset`. Now that you added the offset check to `SystemConsumers`, it looks like this removed functionality is now handled there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257073732 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); Review comment: Oops, I lost track of your change to `BlockingEnvelopeMap` above. Ignore the above comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257063620 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final String offset = "0"; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(offset); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(offset)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset)); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); +final Long testTimeStamp = 10L; +final String testOffset = "10"; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of(testTopicPartition, new OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); + +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + } + + @Test + public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointOldest testStartpointSpecific = new StartpointOldest(); + +// Mock the consumer interactions. + Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. + Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + } + + @Test + public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaSystemConsumer.KafkaStartpointRegistrationHandler +
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257061763 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); + +TopicPartition topicPartition = toTopicPartition(systemStreamPartition); +topicPartitionToStartpointMap.put(topicPartition, startpoint); Review comment: There is logic in the old `register` which checks that the requested offset is older than the existing offset, in order to avoid missing messages. Just double checking that we want to intentionally leave that out? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257062873 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { +if (started.get()) { + String exceptionMessage = String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, startpoint: %s failed.", this, systemStreamPartition, startpoint); + throw new SamzaException(exceptionMessage); +} + +if (!Objects.equals(systemStreamPartition.getSystem(), systemName)) { + LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition); + return; +} + +LOG.debug("Registering the ssp: {}, startpoint: {} with the consumer.", systemStreamPartition, startpoint); + +super.register(systemStreamPartition, startpoint); Review comment: Does `BlockingEnvelopeMap` implement this `register`? Seems like this will throw an exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257058746 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -141,11 +166,14 @@ public void start() { private void startSubscription() { //subscribe to all the registered TopicPartitions -LOG.info("{}: Consumer subscribes to {}", this, topicPartitionsToSSP.keySet()); +Set registeredTopicPartitions = new HashSet<>(); +registeredTopicPartitions.addAll(topicPartitionsToSSP.keySet()); +registeredTopicPartitions.addAll(topicPartitionToStartpointMap.keySet()); Review comment: What is the case where this has a `TopicPartition`, but `topicPartitionsToSSP` does not have it? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257061221 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +387,73 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + static class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +private final Consumer kafkaConsumer; + +KafkaStartpointRegistrationHandler(Consumer kafkaConsumer) { + this.kafkaConsumer = kafkaConsumer; +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); Review comment: `kafkaConsumer.seek` is wrapped in a try-catch above. Should that be done here as well? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257063162 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; Review comment: Maybe extract some of these dummy variables to static variables in the test class, so they can be shared with your other visitor tests? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257062107 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -285,6 +321,27 @@ public void register(SystemStreamPartition systemStreamPartition, String offset) metrics.registerTopicAndPartition(toTopicAndPartition(tp)); } + @Override + public void register(SystemStreamPartition systemStreamPartition, Startpoint startpoint) { Review comment: Do you plan to remove the other `register` method once 1.0.1 is released, or do you plan to support the other `register` for more time? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257063321 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -205,6 +215,156 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final String offset = "0"; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(offset); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(offset)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(offset)); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); +final Long testTimeStamp = 10L; +final String testOffset = "10"; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of(testTopicPartition, new OffsetAndTimestamp(Long.valueOf(testOffset), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); + +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(testTopicPartition, Long.valueOf(testOffset)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + } + + @Test + public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaStartpointRegistrationHandler +kafkaStartpointRegistrationHandler = new KafkaSystemConsumer.KafkaStartpointRegistrationHandler(consumer); + +final StartpointOldest testStartpointSpecific = new StartpointOldest(); + +// Mock the consumer interactions. + Mockito.doNothing().when(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(testSystemStreamPartition, testStartpointSpecific); + +// Mock verifications. + Mockito.verify(consumer).seekToBeginning(ImmutableList.of(testTopicPartition)); + } + + @Test + public void testStartpointUpcomingVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Integer testPartitionId = 0; +final TopicPartition testTopicPartition = new TopicPartition(TEST_STREAM, testPartitionId); +final Partition testPartition = new Partition(testPartitionId); +final SystemStreamPartition testSystemStreamPartition = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, testPartition); + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaSystemConsumer.KafkaStartpointRegistrationHandler +
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257058381 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -60,16 +74,22 @@ private final Config config; private final boolean fetchThresholdBytesEnabled; private final KafkaSystemConsumerMetrics metrics; + private final KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler; // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. final KafkaConsumerMessageSink messageSink; // This proxy contains a separate thread, which reads kafka messages (with consumer.poll()) and populates // BlockingEnvelopMap's buffers. - final private KafkaConsumerProxy proxy; + @VisibleForTesting + KafkaConsumerProxy proxy; // keep registration data until the start - mapping between registered SSPs and topicPartitions, and their offsets final Map topicPartitionsToOffset = new HashMap<>(); Review comment: Can we get rid of this and use `topicPartitionToStartpointMap` instead? If the plan was to do that later after removing the offset-based registration, then maybe this PR can just wait until after we have done that. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r257060720 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -163,26 +191,35 @@ void startConsumer() { LOG.error ("{}: Consumer is not subscribed to any SSPs", this); } -topicPartitionsToOffset.forEach((tp, startingOffsetString) -> { - long startingOffset = Long.valueOf(startingOffsetString); +// If both the startpoint and checkpointed offsets are present for a topic partition, +// then precedence is given to the startpoint. +topicPartitionToStartpointMap.forEach((topicPartition, startpoint) -> { + Partition partition = new Partition(topicPartition.partition()); + SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, topicPartition.topic(), partition); + startpoint.apply(systemStreamPartition, kafkaStartpointRegistrationHandler); + // add the partition to the proxy + proxy.addTopicPartition(topicPartitionsToSSP.get(topicPartition), kafkaConsumer.position(topicPartition)); Review comment: Can this be moved into the registration handler? Seems like this functionality would fall into the responsibility of that object. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services