[GitHub] [samza] Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner

2019-03-19 Thread GitBox
Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way 
inject ExternalContext to TestRunner
URL: https://github.com/apache/samza/pull/962#discussion_r267150940
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
 ##
 @@ -263,4 +268,16 @@ public void genData(Map> 
inputPartitionData, Map

[GitHub] [samza] cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a 
way inject ExternalContext to TestRunner
URL: https://github.com/apache/samza/pull/962#discussion_r267149842
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
 ##
 @@ -263,4 +268,16 @@ public void genData(Map> 
inputPartitionData, Map

[GitHub] [samza] Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner

2019-03-19 Thread GitBox
Sanil15 commented on a change in pull request #962: SAMZA-2135: Provide a way 
inject ExternalContext to TestRunner
URL: https://github.com/apache/samza/pull/962#discussion_r267147519
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
 ##
 @@ -263,4 +268,10 @@ public void genData(Map> 
inputPartitionData, Map

[GitHub] [samza] xinyuiscool merged pull request #944: Release version updates

2019-03-19 Thread GitBox
xinyuiscool merged pull request #944: Release version updates
URL: https://github.com/apache/samza/pull/944
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #962: SAMZA-2135: Provide a 
way inject ExternalContext to TestRunner
URL: https://github.com/apache/samza/pull/962#discussion_r267146513
 
 

 ##
 File path: 
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
 ##
 @@ -263,4 +268,10 @@ public void genData(Map> 
inputPartitionData, Map

[GitHub] [samza] Sanil15 commented on issue #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner

2019-03-19 Thread GitBox
Sanil15 commented on issue #962: SAMZA-2135: Provide a way inject 
ExternalContext to TestRunner
URL: https://github.com/apache/samza/pull/962#issuecomment-474632748
 
 
   @cameronlee314 please have a look!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] Sanil15 opened a new pull request #962: SAMZA-2135: Provide a way inject ExternalContext to TestRunner

2019-03-19 Thread GitBox
Sanil15 opened a new pull request #962: SAMZA-2135: Provide a way inject 
ExternalContext to TestRunner
URL: https://github.com/apache/samza/pull/962
 
 
   - Added a public API to pass ExternalContext to TestRunner
   - Refactored existing tests to test it 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dxichen commented on issue #944: Release version updates

2019-03-19 Thread GitBox
dxichen commented on issue #944: Release version updates
URL: https://github.com/apache/samza/pull/944#issuecomment-474626523
 
 
   @shanthoosh @jagadish-v0 @prateekm Please take a look 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dxichen commented on a change in pull request #944: Release version updates

2019-03-19 Thread GitBox
dxichen commented on a change in pull request #944: Release version updates
URL: https://github.com/apache/samza/pull/944#discussion_r267139268
 
 

 ##
 File path: docs/learn/tutorials/versioned/samza-rest-getting-started.md
 ##
 @@ -48,7 +48,7 @@ Run the following commands:
 {% highlight bash %}
 cd samza-rest/build/distributions/
 mkdir -p deploy/samza-rest
-tar -xvf ./samza-rest-1.0.1-SNAPSHOT.tgz -C deploy/samza-rest
+tar -xvf ./samza-rest-1.1.1-SNAPSHOT.tgz -C deploy/samza-rest
 
 Review comment:
   1. I believe since this is on master it should be the SNAPSHOTTED version of 
the next release (ie 1.1.1-SNAPSHOT) instead of the latest released version.
   2. Following convention of the previous releases, master should be named as 
a SNAPSHOT to be picked up in maven local


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin commented on issue #961: SAMZA-2134:Enable table rate limiter by default.

2019-03-19 Thread GitBox
dengpanyin commented on issue #961: SAMZA-2134:Enable table rate limiter by 
default.
URL: https://github.com/apache/samza/pull/961#issuecomment-474618197
 
 
   @weisong44  Can you help view this RB?  Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dengpanyin opened a new pull request #961: SAMZA-2134:Enable table rate limiter by default.

2019-03-19 Thread GitBox
dengpanyin opened a new pull request #961: SAMZA-2134:Enable table rate limiter 
by default.
URL: https://github.com/apache/samza/pull/961
 
 
   By default rate limiter will be enabled. The user need to provide a rate 
limiter for the
   table descriptor or disable read/write rate limiter explicitly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dxichen opened a new pull request #960: Update website for 1.1.0

2019-03-19 Thread GitBox
dxichen opened a new pull request #960: Update website for 1.1.0
URL: https://github.com/apache/samza/pull/960
 
 
   @vjagadish @shanthoosh  Followed the instructions as per 
https://github.com/apache/samza/blob/master/docs/README.md#release-new-version-website-checklist
   
   Please let me know if I need to update the download version with 1.1.0 as 
well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh merged pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh merged pull request #918: SAMZA-2094: Implement the 
StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh merged pull request #956: SAMZA-2132: Flatten startpoint key when serialized.

2019-03-19 Thread GitBox
shanthoosh merged pull request #956: SAMZA-2132: Flatten startpoint key when 
serialized.
URL: https://github.com/apache/samza/pull/956
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh merged pull request #959: SAMZA-2133: Fix the TestKeyValuePerformance test.

2019-03-19 Thread GitBox
shanthoosh merged pull request #959: SAMZA-2133: Fix the 
TestKeyValuePerformance test.
URL: https://github.com/apache/samza/pull/959
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on issue #959: SAMZA-2133: Fix the TestKeyValuePerformance test.

2019-03-19 Thread GitBox
shanthoosh commented on issue #959: SAMZA-2133: Fix the TestKeyValuePerformance 
test.
URL: https://github.com/apache/samza/pull/959#issuecomment-474583030
 
 
   Ran the test-case locally after this fix to verify this failure does not 
happen again. 
   @sborya @dnishimura 
Please take a look. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh opened a new pull request #959: SAMZA-2133: Fix the TestKeyValuePerformance test.

2019-03-19 Thread GitBox
shanthoosh opened a new pull request #959: SAMZA-2133: Fix the 
TestKeyValuePerformance test.
URL: https://github.com/apache/samza/pull/959
 
 
   Running the TestJeyValuePerformance test results in the following failure:
   
   ```
   2019-03-19 11:16:36.713 [main] TestKeyValuePerformance$ [INFO] Running test: 
rocksdb-write-performance
   Exception in thread "main" java.lang.IllegalArgumentException: Job name is 
not defined in configuration
at 
org.apache.samza.context.JobContextImpl.fromConfigWithDefaults(JobContextImpl.java:49)
at 
org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$invokeTest$2$$anonfun$apply$1.apply$mcVI$sp(TestKeyValuePerformance.scala:126)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at 
org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$invokeTest$2.apply(TestKeyValuePerformance.scala:114)
at 
org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$invokeTest$2.apply(TestKeyValuePerformance.scala:112)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
org.apache.samza.test.performance.TestKeyValuePerformance$.invokeTest(TestKeyValuePerformance.scala:112)
at 
org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$main$1.apply(TestKeyValuePerformance.scala:80)
at 
org.apache.samza.test.performance.TestKeyValuePerformance$$anonfun$main$1.apply(TestKeyValuePerformance.scala:77)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.samza.test.performance.TestKeyValuePerformance$.main(TestKeyValuePerformance.scala:77)
at 
org.apache.samza.test.performance.TestKeyValuePerformance.main(TestKeyValuePerformance.scala)
   :samza-shell:kvPerformanceTest FAILED
   
   ```
   
   Through SAMZA-1714, we added a verification in`JobContextImpl` to validate 
the presence of the `JobId`, `JobName` in configuration. JobId, JobName 
configurations are not populated in config of `TestKeyValuePerformance` before 
launching the test.
   
   The fix is to add the two configuration `JobId`, `JobName` in the 
configuration before launching the test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267088346
 
 

 ##
 File path: 
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 ##
 @@ -93,48 +95,45 @@ object TestKafkaSystemAdmin extends KafkaServerTestHarness 
{
   }
 
   def createTopic(topicName: String, partitionCount: Int) {
-AdminUtils.createTopic(
-  zkUtils,
-  topicName,
-  partitionCount,
-  REPLICATION_FACTOR)
+createTopic(topicName, partitionCount, REPLICATION_FACTOR)
   }
 
   def validateTopic(topic: String, expectedPartitionCount: Int) {
 var done = false
 var retries = 0
-val maxRetries = 100
 
-while (!done && retries < maxRetries) {
+while (!done && retries < 100) {
   try {
-val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), 
SYSTEM, metadataStore.getTopicInfo)
-val topicMetadata = topicMetadataMap(topic)
+  if (!zkClient.topicExists(topic)) {
 
 Review comment:
   Looks like indentation might be off?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267095405
 
 

 ##
 File path: 
samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 ##
 @@ -257,23 +250,27 @@ class StreamTaskTestUtil {
 props.put("group.id", group)
 props.put("auto.offset.reset", "smallest")
 
-val consumerConfig = new ConsumerConfig(props)
-val consumerConnector = Consumer.create(consumerConfig)
-val stream = consumerConnector.createMessageStreams(Map(topic -> 
1))(topic).head.iterator
-var message: MessageAndMetadata[Array[Byte], Array[Byte]] = null
+val consumerConnector = new KafkaConsumer(props)
+consumerConnector.subscribe(Set(topic).asJava)
+
+var stream = consumerConnector.poll(Duration.ofMillis(100)).iterator()
+var message: ConsumerRecord[Nothing, Nothing] = null
 var messages = ArrayBuffer[String]()
 
 while (message == null || message.offset < maxOffsetInclusive) {
-  message = stream.next
-  if (message.message == null) {
-messages += null
+  if (stream.hasNext) {
+message = stream.next
+if (message.value() == null) {
+  messages += null
+} else {
+  messages += new String(message.value, "UTF-8")
+}
+System.err.println("StreamTaskTestUtil.readAll(): offset=%s, 
message=%s" format (message.offset, messages.last))
 
 Review comment:
   I know this was already here, but it doesn't seem like an error. Could it 
just be removed, or at least changed to System.out?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267086592
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
 ##
 @@ -275,11 +280,62 @@ public void testClearStream() {
   TopicDescription td = dtr.all().get().get(topicName);
   Assert.fail("topic " + topicName + " should've been removed. td=" + td);
 } catch (Exception e) {
-  if (e.getCause() instanceof 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException) {
-// expected
-  } else {
+  if (!(e.getCause() instanceof 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException)) {
 Assert.fail("topic " + topicName + " should've been removed. Expected 
UnknownTopicOrPartitionException.");
   }
 }
   }
+
+  @Test
+  public void testShouldAssembleMetadata () {
+Map oldestOffsets = new 
ImmutableMap.Builder()
+.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)), 
"o1")
+.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)), 
"o2")
+.put(new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)), 
"o3")
+.put(new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)), 
"o4")
+.build();
+
 
 Review comment:
   Minor: extra whitespace


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267093231
 
 

 ##
 File path: 
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 ##
 @@ -301,12 +260,6 @@ class TestKafkaSystemAdmin {
 val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
 systemAdmin.createStream(spec)
 validateTopic(topic, 1)
-val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), 
"kafka", metadataStore.getTopicInfo)
-assertTrue(topicMetadataMap.contains(topic))
-val topicMetadata = topicMetadataMap(topic)
-val partitionMetadata = topicMetadata.partitionsMetadata.head
-assertEquals(0, partitionMetadata.partitionId)
-assertEquals(3, partitionMetadata.replicas.size)
 
 Review comment:
   Is it difficult to verify this now?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267059448
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##
 @@ -717,4 +744,21 @@ private OffsetsMaps(Map 
oldestOffsets,
   return upcomingOffsets;
 }
   }
+
+  /**
+   * A helper class for represent changelog related information.
+   */
+  private static class ChangelogInfo {
+int replicationFactor;
 
 Review comment:
   Could you make this an immutable object with getters?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267087682
 
 

 ##
 File path: 
samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 ##
 @@ -110,15 +110,12 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
 kcm1.stop
 
 // check that start actually creates the topic with log compaction enabled
-val zkClient = ZkUtils(zkConnect, 6000, 6000, 
JaasUtils.isZkSecurityEnabled())
-val topicConfig = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, 
checkpointTopic)
+val topicConfig = 
adminZkClient.getAllTopicConfigs().getOrElse(checkpointTopic, new Properties())
 
 Review comment:
   Just double checking: Does `adminZkClient` get managed (including lifecycle 
like `close`) through `KafkaServerTestHarness` now? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #951: SAMZA-2127: Upgrade 
to Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r267091668
 
 

 ##
 File path: 
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 ##
 @@ -257,17 +211,22 @@ class TestKafkaSystemAdmin {
 assertEquals("2", sspMetadata.get(new Partition(48)).getUpcomingOffset)
 
 // Validate that a fetch will return the message.
-val connector = getConsumerConnector
-var stream = connector.createMessageStreams(Map(TOPIC -> 
1))(TOPIC).head.iterator
-var message = stream.next
-var text = new String(message.message, "UTF-8")
-connector.shutdown
+val consumer = getKafkaConsumer
+consumer.subscribe(JavaConverters.setAsJavaSetConverter(Set(TOPIC)).asJava)
+val records = consumer.poll(Duration.ofMillis(1000))
+consumer.close()
+
+assertTrue(records.count() > 2)
 
 Review comment:
   How come this checks for more than 2 messages? It looks like the test only 
produces 2 messages.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267081332
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist()
 {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+
+
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(0L);
+final Map offsetForTimesResult = new 
HashMap<>();
+offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 

[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267081142
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +325,88 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  class KafkaStartpointRegistrationHandler implements StartpointVisitor {
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+
+  // add the partition to the proxy
+  proxy.addTopicPartition(systemStreamPartition, 
kafkaConsumer.position(topicPartition));
 
 Review comment:
   Moved all the kafkaConsumer invocations into a lock.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267052376
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
 
 Review comment:
   Done.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267053644
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267054111
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
 
 Review comment:
   It is better to verify all the mock interactions. I prefer to keep it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267022260
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist()
 {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+
+
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(0L);
+final Map offsetForTimesResult = new 
HashMap<>();
+offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 

[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267054141
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267056681
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -119,9 +136,11 @@ public void 
testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
 consumer.register(ssp1, "3");
 consumer.register(ssp2, "0");
 
-assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
-assertEquals("2", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
-assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+consumer.start();
+
+assertEquals("5", ((StartpointSpecific) 
consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset());
 
 Review comment:
   Fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267053676
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


Re: [VOTE] Apache Samza 1.1.0 RC2

2019-03-19 Thread Boris S
Sorry for the late reply. I've ran into a problem with the performance
tests:
./gradlew samza-shell:kvPerformanceTest
-PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties

I think it is not a code problem, but a test problem, specifically test
configuration problem.

On Mon, Mar 18, 2019 at 5:10 PM santhosh venkat <
santhoshvenkat1...@gmail.com> wrote:

> Hi,
>
> The vote of Samza 1.1.0 has been open for more than 72 hours. We got +1
> (binding)x 3 and +1 (non-binding) x 3 and no vetos.
>
> *Binding +1: Prateek M, Jagadish V, Jake Maes*
> *Non-binding +1: Rayman P, Daniel C, Shanthoosh V*
>
> Thanks everyone for helping validate the release. Samza 1.1.0 has
> officially passed the VOTE.
>
> Thanks,
> Shanthoosh
>
>
> On Mon, Mar 18, 2019 at 4:32 PM Prateek Maheshwari 
> wrote:
>
> > 1. Verified checksum and signatures for the binaries.
> > 2. Ran ./check-all.sh
> > 3. Ran YARN and Standalone integration tests with the config patch
> > successfully.
> >
> > +1(binding) from my side as well.
> >
> > Thanks,
> > Prateek
> >
> > On Mon, Mar 18, 2019 at 2:06 PM Jagadish Venkatraman <
> > jagadish1...@gmail.com>
> > wrote:
> >
> > > 1. Verified check-sum and signatures for the release binaries.
> > > 2. Ran ./check-all.sh successfully
> > > 3. Ran YARN integration tests successfully
> > > 4. Encountered an error on the standalone integration test, but it
> > > succeeded after setting Kafka's replication factor config to 1.
> > >
> > > +1(binding) from my side.
> > >
> > > Thanks Daniel Chen and Shanthoosh for shepherding Samza 1.0.1!
> > >
> > > On Mon, Mar 18, 2019 at 9:47 AM Jake Maes  wrote:
> > >
> > > > Verified with check-all on RHEL 7
> > > >
> > > > Verified pgp and sha.
> > > >
> > > > +1 (binding)
> > > >
> > > > On Fri, Mar 15, 2019 at 11:39 AM rayman preet 
> > > > wrote:
> > > >
> > > > > +1 (Non-binding)
> > > > >
> > > > > --
> > > > > thanks
> > > > > rayman
> > > > >
> > > > > On Wed, Mar 13, 2019 at 7:17 PM Daniel Chen 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I performed the following verifications:
> > > > > >
> > > > > > 1. ./bin/check-all.sh succeeded.
> > > > > >
> > > > > > 2. Verified both ./bin/integration-tests.sh
> yarn-integration-tests
> > > and
> > > > > > ./bin/integration-tests.sh standalone-integration-tests
> succeeded.
> > > > > >
> > > > > > 3. Verified that SQL console available in samza-tool.tgz.
> > > > > >
> > > > > > +1 (Non-binding)
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Daniel
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 12, 2019 at 4:11 PM santhosh venkat <
> > > > > > santhoshvenkat1...@gmail.com> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > This is a call for a vote on a release of Apache Samza 1.1.0.
> > > Thanks
> > > > to
> > > > > > > everyone who has contributed to this release.
> > > > > > >
> > > > > > > The release candidate can be downloaded from here:
> > > > > > > http://home.apache.org/~shanthoosh/samza-1.1.0-rc2/
> > > > > > >
> > > > > > > The release candidate is signed with pgp key
> 0xF8B95961A401BF0F,
> > > > which
> > > > > > can
> > > > > > > be found
> > > > > > >
> > > > >
> > >
> http://keyserver.ubuntu.com/pks/lookup?op=get=0xF8B95961A401BF0F
> > > > > > >
> > > > > > > The git tag is release-1.1.0-rc0 and signed with the same pgp
> > key:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://gitbox.apache.org/repos/asf?p=samza.git;a=tag;h=refs/tags/release-1.1.0-rc2
> > > > > > >
> > > > > > > Test binaries have been published to Maven's staging
> repository,
> > > and
> > > > > are
> > > > > > > available here:
> > > > > > >
> > > > >
> > >
> https://repository.apache.org/content/repositories/orgapachesamza-1060/
> > > > > > >
> > > > > > > The vote will be open for 72 hours (ending at 16:30 PM PST
> > > Thursday,
> > > > > > > 03/15/2018).
> > > > > > >
> > > > > > > Please download the release candidate, check the
> > hashes/signature,
> > > > > build
> > > > > > it
> > > > > > > and test it, and then please vote:
> > > > > > >
> > > > > > > [ ] +1 approve
> > > > > > >
> > > > > > > [ ] +0 no opinion
> > > > > > >
> > > > > > > [ ] -1 disapprove (and reason why)
> > > > > > >
> > > > > > > I ran check-all.sh, integration tests and verified the SQL
> > console
> > > > > > > in samza-tool tgz.
> > > > > > >
> > > > > > > +1 (non-binding) from my side.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > thanks
> > > > > rayman
> > > > >
> > > >
> > >
> > >
> > > --
> > > Jagadish V,
> > > Graduate Student,
> > > Department of Computer Science,
> > > Stanford University
> > >
> >
>


[GitHub] [samza-beam-examples] xinyuiscool merged pull request #2: Update scripts and docs based on Beam 2.11.0 release

2019-03-19 Thread GitBox
xinyuiscool merged pull request #2: Update scripts and docs based on Beam 
2.11.0 release
URL: https://github.com/apache/samza-beam-examples/pull/2
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] asfgit closed pull request #958: SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting

2019-03-19 Thread GitBox
asfgit closed pull request #958: SAMZA-2117: Handle race condition in container 
launch due to incorrect AM accounting
URL: https://github.com/apache/samza/pull/958
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267008613
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
 
 Review comment:
   It seems kind of awkward to test like this, where you put in the mock 
registration handler, and then create a new one to actually test with. Could 
you make `KafkaStartpointRegistrationHandler` a static inner class and then 
pass the necessary objects to it? In my opinion, non-static inner classes are 
harder to read/maintain anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267001831
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -119,9 +136,11 @@ public void 
testConsumerRegisterOlderOffsetOfTheSamzaSSP() {
 consumer.register(ssp1, "3");
 consumer.register(ssp2, "0");
 
-assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp0)));
-assertEquals("2", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp1)));
-assertEquals("0", 
consumer.topicPartitionsToOffset.get(KafkaSystemConsumer.toTopicPartition(ssp2)));
+consumer.start();
+
+assertEquals("5", ((StartpointSpecific) 
consumer.topicPartitionToStartpointMap.get(KafkaSystemConsumer.toTopicPartition(ssp0))).getSpecificOffset());
 
 Review comment:
   The name of the test doesn't seem to match the actual test logic, since the 
oldest offset is no longer being used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267009407
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
 
 Review comment:
   Same as above about not using "any".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267009990
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
 
 Review comment:
   We should be able to check a specific argument instead of using `anyMap`, 
right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r266999014
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +325,88 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  class KafkaStartpointRegistrationHandler implements StartpointVisitor {
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+
+  // add the partition to the proxy
+  proxy.addTopicPartition(systemStreamPartition, 
kafkaConsumer.position(topicPartition));
+}
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointTimestamp startpointTimestamp) {
+  Long timestampInStartpoint = startpointTimestamp.getTimestampOffset();
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  Map topicPartitionsToTimeStamps = 
ImmutableMap.of(topicPartition, timestampInStartpoint);
+
+  // Look up the offset by timestamp.
+  LOG.info("Looking up the offsets of the topic partition: {} by 
timestamp: {}.", topicPartition, timestampInStartpoint);
+  Map topicPartitionToOffsetTimestamps 
= kafkaConsumer.offsetsForTimes(topicPartitionsToTimeStamps);
 
 Review comment:
   Does this need to be protected by the `kafkaConsumer` lock?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267010766
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldMoveTheConsumerToEndWhenTimestampDoesNotExist()
 {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+
+
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(0L);
+final Map offsetForTimesResult = new 
HashMap<>();
+offsetForTimesResult.put(TEST_TOPIC_PARTITION, null);
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seekToEnd(ImmutableList.of(TEST_TOPIC_PARTITION));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointOldestVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 

[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267009293
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
 
 Review comment:
   Can this verify that the correct `SystemStreamPartition` and offset get 
used, instead of just doing "any"?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r267010415
 
 

 ##
 File path: 
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemConsumer.java
 ##
 @@ -206,6 +225,166 @@ public void testFetchThresholdBytesDiabled() {
 consumer.stop();
   }
 
+  @Test
+  public void 
testStartpointSpecificOffsetVisitorShouldUpdateTheFetchOffsetInConsumer() {
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointSpecific testStartpointSpecific = new 
StartpointSpecific(TEST_OFFSET);
+
+// Mock the consumer interactions.
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+// Invoke the consumer with startpoint.
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
testStartpointSpecific);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
Mockito.verify(kafkaConsumerProxy).addTopicPartition(Mockito.any(SystemStreamPartition.class),
 Mockito.anyLong());
+  }
+
+  @Test
+  public void 
testStartpointTimestampVisitorShouldUpdateTheFetchOffsetInConsumer() {
+// Define dummy variables for testing.
+final Long testTimeStamp = 10L;
+
+final KafkaConsumer consumer = Mockito.mock(KafkaConsumer.class);
+final KafkaConsumerProxy kafkaConsumerProxy = 
Mockito.mock(KafkaConsumerProxy.class);
+KafkaSystemConsumer kafkaSystemConsumer = new 
KafkaSystemConsumer(consumer, TEST_SYSTEM, new MapConfig(), TEST_CLIENT_ID, 
kafkaConsumerProxy, Mockito.mock(KafkaSystemConsumerMetrics.class), new 
TestClock(), REGISTRATION_HANDLER);
+
+KafkaStartpointRegistrationHandler kafkaStartpointRegistrationHandler = 
kafkaSystemConsumer.new KafkaStartpointRegistrationHandler();
+
+final StartpointTimestamp startpointTimestamp = new 
StartpointTimestamp(testTimeStamp);
+final Map offsetForTimesResult = 
ImmutableMap.of(
+TEST_TOPIC_PARTITION, new 
OffsetAndTimestamp(Long.valueOf(TEST_OFFSET), testTimeStamp));
+
+// Mock the consumer interactions.
+
Mockito.when(consumer.offsetsForTimes(Mockito.anyMap())).thenReturn(offsetForTimesResult);
+Mockito.doNothing().when(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+
+kafkaStartpointRegistrationHandler.visit(TEST_SYSTEM_STREAM_PARTITION, 
startpointTimestamp);
+
+// Mock verifications.
+Mockito.verify(consumer).seek(TEST_TOPIC_PARTITION, 
Long.valueOf(TEST_OFFSET));
+Mockito.verify(consumer).offsetsForTimes(Mockito.anyMap());
 
 Review comment:
   I don't think `offsetsForTimes` has side effects, so you probably don't need 
to verify this particular call. The test won't work if it doesn't get called 
anyways.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
cameronlee314 commented on a change in pull request #918: SAMZA-2094: Implement 
the StartpointVisitor for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#discussion_r266996590
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemConsumer.java
 ##
 @@ -330,6 +325,88 @@ public String getSystemName() {
 return systemName;
   }
 
+  @VisibleForTesting
+  class KafkaStartpointRegistrationHandler implements StartpointVisitor {
+
+@Override
+public void visit(SystemStreamPartition systemStreamPartition, 
StartpointSpecific startpointSpecific) {
+  TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
+  long offsetInStartpoint = 
Long.parseLong(startpointSpecific.getSpecificOffset());
+  LOG.info("Updating the consumer fetch offsets of topic partition: {} to 
{}.", topicPartition, offsetInStartpoint);
+
+  // KafkaConsumer is not thread-safe.
+  synchronized (kafkaConsumer) {
+kafkaConsumer.seek(topicPartition, offsetInStartpoint);
+  }
+
+  // add the partition to the proxy
+  proxy.addTopicPartition(systemStreamPartition, 
kafkaConsumer.position(topicPartition));
 
 Review comment:
   Does `kafkaConsumer.position` need to be protected by the lock too?
   Also, please check below for other usages of `kafkaConsumer` outside of the 
lock.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] prateekm commented on issue #899: SAMZA-2091: Update ZkClient in samza-standalone to use string serializer

2019-03-19 Thread GitBox
prateekm commented on issue #899:  SAMZA-2091: Update ZkClient in 
samza-standalone to use string serializer
URL: https://github.com/apache/samza/pull/899#issuecomment-474469431
 
 
   @shanthoosh Please merge if you don't need anything else for this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on a change in pull request #956: SAMZA-2132: Flatten startpoint key when serialized.

2019-03-19 Thread GitBox
dnishimura commented on a change in pull request #956: SAMZA-2132: Flatten 
startpoint key when serialized.
URL: https://github.com/apache/samza/pull/956#discussion_r266984446
 
 

 ##
 File path: 
samza-core/src/main/java/org/apache/samza/startpoint/StartpointKey.java
 ##
 @@ -41,12 +41,29 @@
 this.taskName = taskName;
   }
 
+  @JsonIgnore
 
 Review comment:
   @shanthoosh good idea. I'm always for being explicit. Please see the latest 
commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on issue #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
shanthoosh commented on issue #918: SAMZA-2094: Implement the StartpointVisitor 
for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#issuecomment-474441844
 
 
   @dnishimura No, I tried it locally multiple times but was not able to 
reproduce it. But that failure is orthogonal to this change. That test failure 
originates from the API: `KafkaSystemAdmin.createStream()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] dnishimura commented on issue #918: SAMZA-2094: Implement the StartpointVisitor for the KafkaSystemConsumer.

2019-03-19 Thread GitBox
dnishimura commented on issue #918: SAMZA-2094: Implement the StartpointVisitor 
for the KafkaSystemConsumer.
URL: https://github.com/apache/samza/pull/918#issuecomment-474414111
 
 
   Is the timeout error shown in Travis-CI just a transient thing and not 
reproducible locally?
   ```testShouldCreateCoordinatorStream FAILED
   org.apache.samza.SamzaException: Creation of topic 
test-coordinator-stream failed.
   Caused by:
   java.util.concurrent.TimeoutException```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] vjagadish1989 opened a new pull request #958: SAMZA-2117: Handle race condition in container launch due to incorrect AM accounting

2019-03-19 Thread GitBox
vjagadish1989 opened a new pull request #958: SAMZA-2117: Handle race condition 
in container launch due to incorrect AM accounting
URL: https://github.com/apache/samza/pull/958
 
 
   Steps involved when starting a Samza container:
   1. Issue a request to YARN to launch a container
   2. Record that container as "pending" launch. 
   3. Launch callback succeeds on a different thread : The callback looks at 
the "pending" container and marks it as "running".
   
   A race-condition in the above: If the main thread gets pre-empted between 
(1) and (2), the callback thread wouldn't see the container state as "pending" 
- hence, it wouldn't transition it to a "running" state.
   
   This PR fixes it by flipping (1) and (2) - ie., record the intent prior to 
issuing the launch request.
   Added an unit test - refactored existing tests
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[DISCUSS] SEP-21: Samza Async API for High Level

2019-03-19 Thread Bharath Kumarasubramanian
Hi all,

I created SEP-21: Samza Async API for High Level. Please find the SEP wiki below
https://cwiki.apache.org/confluence/display/SAMZA/SEP-21%3A+Samza+Async+API+for+High+Level

Please take a look and chime in your feedback.

Thanks,
Bharath