[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.

2020-01-14 Thread GitBox
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the 
checkpoint and changelog topic auto-creation.
URL: https://github.com/apache/samza/pull/1251#discussion_r366655309
 
 

 ##
 File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
 ##
 @@ -316,16 +316,19 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
 val filteredConfigs = 
config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
 val kafkaChangeLogProperties = new Properties
 
-val appConfig = new ApplicationConfig(config)
 // SAMZA-1600: do not use the combination of "compact,delete" as cleanup 
policy until we pick up Kafka broker 0.11.0.57,
 // 1.0.2, or 1.1.0 (see KAFKA-6568)
 
 // Adjust changelog topic setting, when TTL is set on a RocksDB store
 //  - Disable log compaction on Kafka changelog topic
 //  - Set topic TTL to be the same as RocksDB TTL
-Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
+val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name
+Option(config.get(storeTTLkey)) match {
 
 Review comment:
   Removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.

2020-01-14 Thread GitBox
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the 
checkpoint and changelog topic auto-creation.
URL: https://github.com/apache/samza/pull/1251#discussion_r366655296
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##
 @@ -541,8 +541,11 @@ public KafkaStreamSpec toKafkaSpec(StreamSpec spec) {
   new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), 
systemName, 1, coordinatorStreamReplicationFactor,
   coordinatorStreamProperties);
 } else if (spec.isCheckpointStream()) {
-  kafkaSpec = 
KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(),
 systemName))
-  .copyWithReplicationFactor(Integer.parseInt(new 
KafkaConfig(config).getCheckpointReplicationFactor().get()));
+  Properties checkpointTopicProperties = new Properties();
+  checkpointTopicProperties.putAll(spec.getConfig());
+  kafkaSpec = 
KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(),
 spec.getSystemName()))
+  .copyWithReplicationFactor(Integer.parseInt(new 
KafkaConfig(config).getCheckpointReplicationFactor().get()))
+  .copyWithProperties(checkpointTopicProperties);
 
 Review comment:
   Yes, adding the `copyWithConfig` to KafkaStreamSpec might be good 
convenience API. We've additional cleanup planned with this control-flow in the 
near-future. If acceptable, then I can couple this change with that. Since it's 
a critical bug-fix, I would prefer to keep the change minimal by containing it 
to the fix alone and do the clean-up later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.

2020-01-14 Thread GitBox
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the 
checkpoint and changelog topic auto-creation.
URL: https://github.com/apache/samza/pull/1251#discussion_r366608326
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##
 @@ -541,8 +541,11 @@ public KafkaStreamSpec toKafkaSpec(StreamSpec spec) {
   new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), 
systemName, 1, coordinatorStreamReplicationFactor,
   coordinatorStreamProperties);
 } else if (spec.isCheckpointStream()) {
-  kafkaSpec = 
KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(),
 systemName))
-  .copyWithReplicationFactor(Integer.parseInt(new 
KafkaConfig(config).getCheckpointReplicationFactor().get()));
+  Properties checkpointTopicProperties = new Properties();
+  checkpointTopicProperties.putAll(spec.getConfig());
+  kafkaSpec = 
KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(),
 spec.getSystemName()))
+  .copyWithReplicationFactor(Integer.parseInt(new 
KafkaConfig(config).getCheckpointReplicationFactor().get()))
+  .copyWithProperties(checkpointTopicProperties);
 
 Review comment:
   The control flow to create the checkpoint stream is the following:
   
   1. Build the checkpoint spec in `KafkaCheckpointManagerFactory`, which uses 
kafkaConfig.getCheckpointProperties to populate the config in `CheckpointSpec`.
   2. `KafkaCheckpointManager.createStream` uses the checkpoint spec that was 
built from 1 and `KafkaSystemAdmin.createStream` in-turn invokes 
`KafkaSystemAdmin.toKafkaSpec`.
   3. `KafkaSystemAdmin.toKafkaSpec(CheckpointSpec)` above is used to convert 
the incoming spec of type `StreamSpec` to `KafkaStreamSpec` .

   
   The config in incoming checkpoint `StreamSpec` is already built using   
`kafkaConfig.getCheckpointTopicProperties` in `KafkaCheckpointManagerFactory`. 
So using `kafkaConfig.getCheckpointProperties` here again would be redundant, 
unnecessary and might return incorrect config-bag.
   
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.

2020-01-14 Thread GitBox
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the 
checkpoint and changelog topic auto-creation.
URL: https://github.com/apache/samza/pull/1251#discussion_r366541989
 
 

 ##
 File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
 ##
 @@ -316,16 +316,18 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
 val filteredConfigs = 
config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
 val kafkaChangeLogProperties = new Properties
 
-val appConfig = new ApplicationConfig(config)
 // SAMZA-1600: do not use the combination of "compact,delete" as cleanup 
policy until we pick up Kafka broker 0.11.0.57,
 // 1.0.2, or 1.1.0 (see KAFKA-6568)
 
 // Adjust changelog topic setting, when TTL is set on a RocksDB store
 //  - Disable log compaction on Kafka changelog topic
 //  - Set topic TTL to be the same as RocksDB TTL
-Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
+val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name
+Option(config.get(storeTTLkey)) match {
   case Some(rocksDbTtl) =>
-if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" 
format name)) {
+if (config.getInt(storeTTLkey, 0) < 0) {
+  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+} else if 
(!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
 
 Review comment:
   Yes.  At the end of this function, the changelog properties if defined for 
the store in user-config are picked-up and populated in the resultant 
change-log-topic-properties. So this cleanup.policy check is still necessary. 
We could simplify the code, but I prefer not to couple clean-up with a 
critical-fix.  What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.

2020-01-14 Thread GitBox
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the 
checkpoint and changelog topic auto-creation.
URL: https://github.com/apache/samza/pull/1251#discussion_r366527827
 
 

 ##
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##
 @@ -541,8 +541,8 @@ public KafkaStreamSpec toKafkaSpec(StreamSpec spec) {
   new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), 
systemName, 1, coordinatorStreamReplicationFactor,
   coordinatorStreamProperties);
 } else if (spec.isCheckpointStream()) {
-  kafkaSpec = 
KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(),
 systemName))
-  .copyWithReplicationFactor(Integer.parseInt(new 
KafkaConfig(config).getCheckpointReplicationFactor().get()));
+  kafkaSpec = KafkaStreamSpec.fromSpec(spec)
 
 Review comment:
   1. Fixed the indentation. 
   2. Updated the checkpoint spec building to be consistent with 
`KafkaCheckpointManager`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.

2020-01-14 Thread GitBox
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the 
checkpoint and changelog topic auto-creation.
URL: https://github.com/apache/samza/pull/1251#discussion_r366528780
 
 

 ##
 File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
 ##
 @@ -316,16 +316,18 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
 val filteredConfigs = 
config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
 val kafkaChangeLogProperties = new Properties
 
-val appConfig = new ApplicationConfig(config)
 // SAMZA-1600: do not use the combination of "compact,delete" as cleanup 
policy until we pick up Kafka broker 0.11.0.57,
 // 1.0.2, or 1.1.0 (see KAFKA-6568)
 
 // Adjust changelog topic setting, when TTL is set on a RocksDB store
 //  - Disable log compaction on Kafka changelog topic
 //  - Set topic TTL to be the same as RocksDB TTL
-Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
+val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name
+Option(config.get(storeTTLkey)) match {
   case Some(rocksDbTtl) =>
-if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" 
format name)) {
+if (config.getInt(storeTTLkey, 0) < 0) {
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.

2020-01-14 Thread GitBox
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the 
checkpoint and changelog topic auto-creation.
URL: https://github.com/apache/samza/pull/1251#discussion_r366541989
 
 

 ##
 File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
 ##
 @@ -316,16 +316,18 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
 val filteredConfigs = 
config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true)
 val kafkaChangeLogProperties = new Properties
 
-val appConfig = new ApplicationConfig(config)
 // SAMZA-1600: do not use the combination of "compact,delete" as cleanup 
policy until we pick up Kafka broker 0.11.0.57,
 // 1.0.2, or 1.1.0 (see KAFKA-6568)
 
 // Adjust changelog topic setting, when TTL is set on a RocksDB store
 //  - Disable log compaction on Kafka changelog topic
 //  - Set topic TTL to be the same as RocksDB TTL
-Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match {
+val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name
+Option(config.get(storeTTLkey)) match {
   case Some(rocksDbTtl) =>
-if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" 
format name)) {
+if (config.getInt(storeTTLkey, 0) < 0) {
+  kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
+} else if 
(!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) {
 
 Review comment:
   Yes.  At the end of this function, the changelog properties if defined for 
the store in user-config are picked-up and populated in the resultant 
change-log-topic-properties. So this additional cleanup.policy is still 
necessary. We could simplify the code, but I prefer not to couple clean-up with 
a critical-fix.  What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services