[GitHub] [samza] Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner
Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner URL: https://github.com/apache/samza/pull/962#discussion_r267150940 ## File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ## @@ -263,4 +268,16 @@ public void genData(Map> inputPartitionData, Map
[GitHub] [samza] cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner
cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner URL: https://github.com/apache/samza/pull/962#discussion_r267149842 ## File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ## @@ -263,4 +268,16 @@ public void genData(Map> inputPartitionData, Map
[GitHub] [samza] Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner
Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner URL: https://github.com/apache/samza/pull/962#discussion_r267147519 ## File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ## @@ -263,4 +268,10 @@ public void genData(Map> inputPartitionData, Map
[GitHub] [samza] xinyuiscool merged pull request #944: Release version updates
xinyuiscool merged pull request #944: Release version updates URL: https://github.com/apache/samza/pull/944 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner
cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner URL: https://github.com/apache/samza/pull/962#discussion_r267146513 ## File path: samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java ## @@ -263,4 +268,10 @@ public void genData(Map> inputPartitionData, Map
[GitHub] [samza] Sanil15 commented on issue #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner
Sanil15 commented on issue #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner URL: https://github.com/apache/samza/pull/962#issuecomment-474632748 @cameronlee314 please have a look! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] Sanil15 opened a new pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner
Sanil15 opened a new pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner URL: https://github.com/apache/samza/pull/962 - Added a public API to pass ExternalContext to TestRunner - Refactored existing tests to test it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dxichen commented on issue #944: Release version updates
dxichen commented on issue #944: Release version updates URL: https://github.com/apache/samza/pull/944#issuecomment-474626523 @shanthoosh @jagadish-v0 @prateekm Please take a look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dxichen commented on a change in pull request #944: Release version updates
dxichen commented on a change in pull request #944: Release version updates URL: https://github.com/apache/samza/pull/944#discussion_r267139268 ## File path: docs/learn/tutorials/versioned/samza-rest-getting-started.md ## @@ -48,7 +48,7 @@ Run the following commands: {% highlight bash %} cd samza-rest/build/distributions/ mkdir -p deploy/samza-rest -tar -xvf ./samza-rest-1.0.1-SNAPSHOT.tgz -C deploy/samza-rest +tar -xvf ./samza-rest-1.1.1-SNAPSHOT.tgz -C deploy/samza-rest Review comment: 1. I believe since this is on master it should be the SNAPSHOTTED version of the next release (ie 1.1.1-SNAPSHOT) instead of the latest released version. 2. Following convention of the previous releases, master should be named as a SNAPSHOT to be picked up in maven local This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin commented on issue #961: SAMZA-2134:Enable table rate limiter by default.
dengpanyin commented on issue #961: SAMZA-2134:Enable table rate limiter by default. URL: https://github.com/apache/samza/pull/961#issuecomment-474618197 @weisong44 Can you help view this RB? Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dengpanyin opened a new pull request #961: SAMZA-2134:Enable table rate limiter by default.
dengpanyin opened a new pull request #961: SAMZA-2134:Enable table rate limiter by default. URL: https://github.com/apache/samza/pull/961 By default rate limiter will be enabled. The user need to provide a rate limiter for the table descriptor or disable read/write rate limiter explicitly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dxichen opened a new pull request #960: Update website for 1.1.0
dxichen opened a new pull request #960: Update website for 1.1.0 URL: https://github.com/apache/samza/pull/960 @vjagadish @shanthoosh Followed the instructions as per https://github.com/apache/samza/blob/master/docs/README.md#release-new-version-website-checklist Please let me know if I need to update the download version with 1.1.0 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh merged pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh merged pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh merged pull request #956: SAMZA-2132: Flatten startpoint key when serialized.
shanthoosh merged pull request #956: SAMZA-2132: Flatten startpoint key when serialized. URL: https://github.com/apache/samza/pull/956 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh merged pull request #959: SAMZA-2133: Fix the TestKeyValuePerformance test.
shanthoosh merged pull request #959: SAMZA-2133: Fix the TestKeyValuePerformance test. URL: https://github.com/apache/samza/pull/959 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on issue #959: SAMZA-2133: Fix the TestKeyValuePerformance test.
shanthoosh commented on issue #959: SAMZA-2133: Fix the TestKeyValuePerformance test. URL: https://github.com/apache/samza/pull/959#issuecomment-474583030 Ran the test-case locally after this fix to verify this failure does not happen again. @sborya @dnishimura Please take a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh opened a new pull request #959: SAMZA-2133: Fix the TestKeyValuePerformance test.
shanthoosh opened a new pull request #959: SAMZA-2133: Fix the TestKeyValuePerformance test. URL: https://github.com/apache/samza/pull/959 Running the TestJeyValuePerformance test results in the following failure: ``` 2019-03-19 11:16:36.713 [main] TestKeyValuePerformance$ [INFO] Running test: rocksdb-write-performance Exception in thread "main" java.lang.IllegalArgumentException: Job name is not defined in configuration at org.apache.samza.context.JobContextImpl.fromConfigWithDefaults(JobContextImpl.java:49) at org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$invokeTest$2$$anonfun$apply$1.apply$mcVI$sp(TestKeyValuePerformance.scala:126) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$invokeTest$2.apply(TestKeyValuePerformance.scala:114) at org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$invokeTest$2.apply(TestKeyValuePerformance.scala:112) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) at org.apache.samza.test.performance.TestKeyValuePerformance$.invokeTest(TestKeyValuePerformance.scala:112) at org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$main$1.apply(TestKeyValuePerformance.scala:80) at org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$main$1.apply(TestKeyValuePerformance.scala:77) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.samza.test.performance.TestKeyValuePerformance$.main(TestKeyValuePerformance.scala:77) at org.apache.samza.test.performance.TestKeyValuePerformance.main(TestKeyValuePerformance.scala) :samza-shell:kvPerformanceTest FAILED ``` Through SAMZA-1714, we added a verification in`JobContextImpl` to validate the presence of the `JobId`, `JobName` in configuration. JobId, JobName configurations are not populated in config of `TestKeyValuePerformance` before launching the test. The fix is to add the two configuration `JobId`, `JobName` in the configuration before launching the test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267088346 ## File path: samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ## @@ -93,48 +95,45 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness { } def createTopic(topicName: String, partitionCount: Int) { -AdminUtils.createTopic( - zkUtils, - topicName, - partitionCount, - REPLICATION_FACTOR) +createTopic(topicName, partitionCount, REPLICATION_FACTOR) } def validateTopic(topic: String, expectedPartitionCount: Int) { var done = false var retries = 0 -val maxRetries = 100 -while (!done && retries < maxRetries) { +while (!done && retries < 100) { try { -val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo) -val topicMetadata = topicMetadataMap(topic) + if (!zkClient.topicExists(topic)) { Review comment: Looks like indentation might be off? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267095405 ## File path: samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala ## @@ -257,23 +250,27 @@ class StreamTaskTestUtil { props.put("group.id", group) props.put("auto.offset.reset", "smallest") -val consumerConfig = new ConsumerConfig(props) -val consumerConnector = Consumer.create(consumerConfig) -val stream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head.iterator -var message: MessageAndMetadata[Array[Byte], Array[Byte]] = null +val consumerConnector = new KafkaConsumer(props) +consumerConnector.subscribe(Set(topic).asJava) + +var stream = consumerConnector.poll(Duration.ofMillis(100)).iterator() +var message: ConsumerRecord[Nothing, Nothing] = null var messages = ArrayBuffer[String]() while (message == null || message.offset < maxOffsetInclusive) { - message = stream.next - if (message.message == null) { -messages += null + if (stream.hasNext) { +message = stream.next +if (message.value() == null) { + messages += null +} else { + messages += new String(message.value, "UTF-8") +} +System.err.println("StreamTaskTestUtil.readAll(): offset=%s, message=%s" format (message.offset, messages.last)) Review comment: I know this was already here, but it doesn't seem like an error. Could it just be removed, or at least changed to System.out? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267086592 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java ## @@ -275,11 +280,62 @@ public void testClearStream() { TopicDescription td = dtr.all().get().get(topicName); Assert.fail("topic " + topicName + " should've been removed. td=" + td); } catch (Exception e) { - if (e.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) { -// expected - } else { + if (!(e.getCause() instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException)) { Assert.fail("topic " + topicName + " should've been removed. Expected UnknownTopicOrPartitionException."); } } } + + @Test + public void testShouldAssembleMetadata () { +Map oldestOffsets = new ImmutableMap.Builder() +.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)), "o1") +.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)), "o2") +.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)), "o3") +.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)), "o4") +.build(); + Review comment: Minor: extra whitespace This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267093231 ## File path: samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ## @@ -301,12 +260,6 @@ class TestKafkaSystemAdmin { val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka") systemAdmin.createStream(spec) validateTopic(topic, 1) -val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo) -assertTrue(topicMetadataMap.contains(topic)) -val topicMetadata = topicMetadataMap(topic) -val partitionMetadata = topicMetadata.partitionsMetadata.head -assertEquals(0, partitionMetadata.partitionId) -assertEquals(3, partitionMetadata.replicas.size) Review comment: Is it difficult to verify this now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267059448 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ## @@ -717,4 +744,21 @@ private OffsetsMaps(Map oldestOffsets, return upcomingOffsets; } } + + /** + * A helper class for represent changelog related information. + */ + private static class ChangelogInfo { +int replicationFactor; Review comment: Could you make this an immutable object with getters? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267087682 ## File path: samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ## @@ -110,15 +110,12 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { kcm1.stop // check that start actually creates the topic with log compaction enabled -val zkClient = ZkUtils(zkConnect, 6000, 6000, JaasUtils.isZkSecurityEnabled()) -val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, checkpointTopic) +val topicConfig = adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties()) Review comment: Just double checking: Does `adminZkClient` get managed (including lifecycle like `close`) through `KafkaServerTestHarness` now? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r267091668 ## File path: samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala ## @@ -257,17 +211,22 @@ class TestKafkaSystemAdmin { assertEquals("2", sspMetadata.get(new Partition(48)).getUpcomingOffset) // Validate that a fetch will return the message. -val connector = getConsumerConnector -var stream = connector.createMessageStreams(Map(TOPIC -> 1))(TOPIC).head.iterator -var message = stream.next -var text = new String(message.message, "UTF-8") -connector.shutdown +val consumer = getKafkaConsumer +consumer.subscribe(JavaConverters.setAsJavaSetConverter(Set(TOPIC)).asJava) +val records = consumer.poll(Duration.ofMillis(1000)) +consumer.close() + +assertTrue(records.count() > 2) Review comment: How come this checks for more than 2 messages? It looks like the test only produces 2 messages. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267081332 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); + + +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L); +final Map offsetForTimesResult = new HashMap<>(); +offsetForTimesResult.put(TEST_TOPIC_PARTITION, null); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy =
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267081142 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +325,88 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } + + // add the partition to the proxy + proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition)); Review comment: Moved all the kafkaConsumer invocations into a lock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267052376 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267053644 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267054111 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); Review comment: It is better to verify all the mock interactions. I prefer to keep it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267022260 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); + + +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L); +final Map offsetForTimesResult = new HashMap<>(); +offsetForTimesResult.put(TEST_TOPIC_PARTITION, null); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy =
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267054141 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267056681 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -119,9 +136,11 @@ public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { consumer.register(ssp1, "3"); consumer.register(ssp2, "0"); -assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); -assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); -assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); +consumer.start(); + +assertEquals("5", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset()); Review comment: Fixed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267053676 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
Re: [VOTE] Apache Samza 1.1.0 RC2
Sorry for the late reply. I've ran into a problem with the performance tests: ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties I think it is not a code problem, but a test problem, specifically test configuration problem. On Mon, Mar 18, 2019 at 5:10 PM santhosh venkat < santhoshvenkat1...@gmail.com> wrote: > Hi, > > The vote of Samza 1.1.0 has been open for more than 72 hours. We got +1 > (binding)x 3 and +1 (non-binding) x 3 and no vetos. > > *Binding +1: Prateek M, Jagadish V, Jake Maes* > *Non-binding +1: Rayman P, Daniel C, Shanthoosh V* > > Thanks everyone for helping validate the release. Samza 1.1.0 has > officially passed the VOTE. > > Thanks, > Shanthoosh > > > On Mon, Mar 18, 2019 at 4:32 PM Prateek Maheshwari > wrote: > > > 1. Verified checksum and signatures for the binaries. > > 2. Ran ./check-all.sh > > 3. Ran YARN and Standalone integration tests with the config patch > > successfully. > > > > +1(binding) from my side as well. > > > > Thanks, > > Prateek > > > > On Mon, Mar 18, 2019 at 2:06 PM Jagadish Venkatraman < > > jagadish1...@gmail.com> > > wrote: > > > > > 1. Verified check-sum and signatures for the release binaries. > > > 2. Ran ./check-all.sh successfully > > > 3. Ran YARN integration tests successfully > > > 4. Encountered an error on the standalone integration test, but it > > > succeeded after setting Kafka's replication factor config to 1. > > > > > > +1(binding) from my side. > > > > > > Thanks Daniel Chen and Shanthoosh for shepherding Samza 1.0.1! > > > > > > On Mon, Mar 18, 2019 at 9:47 AM Jake Maes wrote: > > > > > > > Verified with check-all on RHEL 7 > > > > > > > > Verified pgp and sha. > > > > > > > > +1 (binding) > > > > > > > > On Fri, Mar 15, 2019 at 11:39 AM rayman preet > > > > wrote: > > > > > > > > > +1 (Non-binding) > > > > > > > > > > -- > > > > > thanks > > > > > rayman > > > > > > > > > > On Wed, Mar 13, 2019 at 7:17 PM Daniel Chen > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I performed the following verifications: > > > > > > > > > > > > 1. ./bin/check-all.sh succeeded. > > > > > > > > > > > > 2. Verified both ./bin/integration-tests.sh > yarn-integration-tests > > > and > > > > > > ./bin/integration-tests.sh standalone-integration-tests > succeeded. > > > > > > > > > > > > 3. Verified that SQL console available in samza-tool.tgz. > > > > > > > > > > > > +1 (Non-binding) > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Daniel > > > > > > > > > > > > > > > > > > On Tue, Mar 12, 2019 at 4:11 PM santhosh venkat < > > > > > > santhoshvenkat1...@gmail.com> wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > This is a call for a vote on a release of Apache Samza 1.1.0. > > > Thanks > > > > to > > > > > > > everyone who has contributed to this release. > > > > > > > > > > > > > > The release candidate can be downloaded from here: > > > > > > > http://home.apache.org/~shanthoosh/samza-1.1.0-rc2/ > > > > > > > > > > > > > > The release candidate is signed with pgp key > 0xF8B95961A401BF0F, > > > > which > > > > > > can > > > > > > > be found > > > > > > > > > > > > > > > > http://keyserver.ubuntu.com/pks/lookup?op=get=0xF8B95961A401BF0F > > > > > > > > > > > > > > The git tag is release-1.1.0-rc0 and signed with the same pgp > > key: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://gitbox.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.1.0-rc2 > > > > > > > > > > > > > > Test binaries have been published to Maven's staging > repository, > > > and > > > > > are > > > > > > > available here: > > > > > > > > > > > > > > > > https://repository.apache.org/content/repositories/orgapachesamza-1060/ > > > > > > > > > > > > > > The vote will be open for 72 hours (ending at 16:30 PM PST > > > Thursday, > > > > > > > 03/15/2018). > > > > > > > > > > > > > > Please download the release candidate, check the > > hashes/signature, > > > > > build > > > > > > it > > > > > > > and test it, and then please vote: > > > > > > > > > > > > > > [ ] +1 approve > > > > > > > > > > > > > > [ ] +0 no opinion > > > > > > > > > > > > > > [ ] -1 disapprove (and reason why) > > > > > > > > > > > > > > I ran check-all.sh, integration tests and verified the SQL > > console > > > > > > > in samza-tool tgz. > > > > > > > > > > > > > > +1 (non-binding) from my side. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > thanks > > > > > rayman > > > > > > > > > > > > > > > > > > -- > > > Jagadish V, > > > Graduate Student, > > > Department of Computer Science, > > > Stanford University > > > > > >
[GitHub] [samza-beam-examples] xinyuiscool merged pull request #2: Update scripts and docs based on Beam 2.11.0 release
xinyuiscool merged pull request #2: Update scripts and docs based on Beam 2.11.0 release URL: https://github.com/apache/samza-beam-examples/pull/2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] asfgit closed pull request #958: SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting
asfgit closed pull request #958: SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting URL: https://github.com/apache/samza/pull/958 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267008613 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); Review comment: It seems kind of awkward to test like this, where you put in the mock registration handler, and then create a new one to actually test with. Could you make `KafkaStartpointRegistrationHandler` a static inner class and then pass the necessary objects to it? In my opinion, non-static inner classes are harder to read/maintain anyways. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267001831 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -119,9 +136,11 @@ public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { consumer.register(ssp1, "3"); consumer.register(ssp2, "0"); -assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0))); -assertEquals("2", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1))); -assertEquals("0", consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2))); +consumer.start(); + +assertEquals("5", ((StartpointSpecific) consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset()); Review comment: The name of the test doesn't seem to match the actual test logic, since the oldest offset is no longer being used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267009407 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); Review comment: Same as above about not using "any". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267009990 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); Review comment: We should be able to check a specific argument instead of using `anyMap`, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r266999014 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +325,88 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } + + // add the partition to the proxy + proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition)); +} + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) { + Long timestampInStartpoint = startpointTimestamp.getTimestampOffset(); + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + Map topicPartitionsToTimeStamps = ImmutableMap.of(topicPartition, timestampInStartpoint); + + // Look up the offset by timestamp. + LOG.info("Looking up the offsets of the topic partition: {} by timestamp: {}.", topicPartition, timestampInStartpoint); + Map topicPartitionToOffsetTimestamps = kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps); Review comment: Does this need to be protected by the `kafkaConsumer` lock? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267010766 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); + + +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(0L); +final Map offsetForTimesResult = new HashMap<>(); +offsetForTimesResult.put(TEST_TOPIC_PARTITION, null); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy =
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267009293 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); Review comment: Can this verify that the correct `SystemStreamPartition` and offset get used, instead of just doing "any"? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r267010415 ## File path: samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java ## @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() { consumer.stop(); } + @Test + public void testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() { +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointSpecific testStartpointSpecific = new StartpointSpecific(TEST_OFFSET); + +// Mock the consumer interactions. +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +// Invoke the consumer with startpoint. +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, testStartpointSpecific); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class), Mockito.anyLong()); + } + + @Test + public void testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() { +// Define dummy variables for testing. +final Long testTimeStamp = 10L; + +final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class); +final KafkaConsumerProxy kafkaConsumerProxy = Mockito.mock(KafkaConsumerProxy.class); +KafkaSystemConsumer kafkaSystemConsumer = new KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new TestClock(), REGISTRATION_HANDLER); + +KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = kafkaSystemConsumer.new KafkaStartpointRegistrationHandler(); + +final StartpointTimestamp startpointTimestamp = new StartpointTimestamp(testTimeStamp); +final Map offsetForTimesResult = ImmutableMap.of( +TEST_TOPIC_PARTITION, new OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp)); + +// Mock the consumer interactions. + Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult); +Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); + +kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, startpointTimestamp); + +// Mock verifications. +Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, Long.valueOf(TEST_OFFSET)); +Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap()); Review comment: I don't think `offsetsForTimes` has side effects, so you probably don't need to verify this particular call. The test won't work if it doesn't get called anyways. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#discussion_r266996590 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java ## @@ -330,6 +325,88 @@ public String getSystemName() { return systemName; } + @VisibleForTesting + class KafkaStartpointRegistrationHandler implements StartpointVisitor { + +@Override +public void visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) { + TopicPartition topicPartition = toTopicPartition(systemStreamPartition); + long offsetInStartpoint = Long.parseLong(startpointSpecific.getSpecificOffset()); + LOG.info("Updating the consumer fetch offsets of topic partition: {} to {}.", topicPartition, offsetInStartpoint); + + // KafkaConsumer is not thread-safe. + synchronized (kafkaConsumer) { +kafkaConsumer.seek(topicPartition, offsetInStartpoint); + } + + // add the partition to the proxy + proxy.addTopicPartition(systemStreamPartition, kafkaConsumer.position(topicPartition)); Review comment: Does `kafkaConsumer.position` need to be protected by the lock too? Also, please check below for other usages of `kafkaConsumer` outside of the lock. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] prateekm commented on issue #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer
prateekm commented on issue #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer URL: https://github.com/apache/samza/pull/899#issuecomment-474469431 @shanthoosh Please merge if you don't need anything else for this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on a change in pull request #956: SAMZA-2132: Flatten startpoint key when serialized.
dnishimura commented on a change in pull request #956: SAMZA-2132: Flatten startpoint key when serialized. URL: https://github.com/apache/samza/pull/956#discussion_r266984446 ## File path: samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java ## @@ -41,12 +41,29 @@ this.taskName = taskName; } + @JsonIgnore Review comment: @shanthoosh good idea. I'm always for being explicit. Please see the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on issue #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
shanthoosh commented on issue #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#issuecomment-474441844 @dnishimura No, I tried it locally multiple times but was not able to reproduce it. But that failure is orthogonal to this change. That test failure originates from the API: `KafkaSystemAdmin.createStream()` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] dnishimura commented on issue #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.
dnishimura commented on issue #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer. URL: https://github.com/apache/samza/pull/918#issuecomment-474414111 Is the timeout error shown in Travis-CI just a transient thing and not reproducible locally? ```testShouldCreateCoordinatorStream FAILED org.apache.samza.SamzaException: Creation of topic test-coordinator-stream failed. Caused by: java.util.concurrent.TimeoutException``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] vjagadish1989 opened a new pull request #958: SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting
vjagadish1989 opened a new pull request #958: SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting URL: https://github.com/apache/samza/pull/958 Steps involved when starting a Samza container: 1. Issue a request to YARN to launch a container 2. Record that container as "pending" launch. 3. Launch callback succeeds on a different thread : The callback looks at the "pending" container and marks it as "running". A race-condition in the above: If the main thread gets pre-empted between (1) and (2), the callback thread wouldn't see the container state as "pending" - hence, it wouldn't transition it to a "running" state. This PR fixes it by flipping (1) and (2) - ie., record the intent prior to issuing the launch request. Added an unit test - refactored existing tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[DISCUSS] SEP-21: Samza Async API for High Level
Hi all, I created SEP-21: Samza Async API for High Level. Please find the SEP wiki below https://cwiki.apache.org/confluence/display/SAMZA/SEP-21%3A+Samza+Async+API+for+High+Level Please take a look and chime in your feedback. Thanks, Bharath