[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. URL: https://github.com/apache/samza/pull/1251#discussion_r366655309 ## File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ## @@ -316,16 +316,19 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) val kafkaChangeLogProperties = new Properties -val appConfig = new ApplicationConfig(config) // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57, // 1.0.2, or 1.1.0 (see KAFKA-6568) // Adjust changelog topic setting, when TTL is set on a RocksDB store // - Disable log compaction on Kafka changelog topic // - Set topic TTL to be the same as RocksDB TTL -Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match { +val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name +Option(config.get(storeTTLkey)) match { Review comment: Removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. URL: https://github.com/apache/samza/pull/1251#discussion_r366655296 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ## @@ -541,8 +541,11 @@ public KafkaStreamSpec toKafkaSpec(StreamSpec spec) { new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties); } else if (spec.isCheckpointStream()) { - kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), systemName)) - .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())); + Properties checkpointTopicProperties = new Properties(); + checkpointTopicProperties.putAll(spec.getConfig()); + kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName())) + .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())) + .copyWithProperties(checkpointTopicProperties); Review comment: Yes, adding the `copyWithConfig` to KafkaStreamSpec might be good convenience API. We've additional cleanup planned with this control-flow in the near-future. If acceptable, then I can couple this change with that. Since it's a critical bug-fix, I would prefer to keep the change minimal by containing it to the fix alone and do the clean-up later. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. URL: https://github.com/apache/samza/pull/1251#discussion_r366608326 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ## @@ -541,8 +541,11 @@ public KafkaStreamSpec toKafkaSpec(StreamSpec spec) { new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties); } else if (spec.isCheckpointStream()) { - kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), systemName)) - .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())); + Properties checkpointTopicProperties = new Properties(); + checkpointTopicProperties.putAll(spec.getConfig()); + kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName())) + .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())) + .copyWithProperties(checkpointTopicProperties); Review comment: The control flow to create the checkpoint stream is the following: 1. Build the checkpoint spec in `KafkaCheckpointManagerFactory`, which uses kafkaConfig.getCheckpointProperties to populate the config in `CheckpointSpec`. 2. `KafkaCheckpointManager.createStream` uses the checkpoint spec that was built from 1 and `KafkaSystemAdmin.createStream` in-turn invokes `KafkaSystemAdmin.toKafkaSpec`. 3. `KafkaSystemAdmin.toKafkaSpec(CheckpointSpec)` above is used to convert the incoming spec of type `StreamSpec` to `KafkaStreamSpec` . The config in incoming checkpoint `StreamSpec` is already built using `kafkaConfig.getCheckpointTopicProperties` in `KafkaCheckpointManagerFactory`. So using `kafkaConfig.getCheckpointProperties` here again would be redundant, unnecessary and might return incorrect config-bag. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. URL: https://github.com/apache/samza/pull/1251#discussion_r366541989 ## File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ## @@ -316,16 +316,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) val kafkaChangeLogProperties = new Properties -val appConfig = new ApplicationConfig(config) // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57, // 1.0.2, or 1.1.0 (see KAFKA-6568) // Adjust changelog topic setting, when TTL is set on a RocksDB store // - Disable log compaction on Kafka changelog topic // - Set topic TTL to be the same as RocksDB TTL -Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match { +val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name +Option(config.get(storeTTLkey)) match { case Some(rocksDbTtl) => -if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { +if (config.getInt(storeTTLkey, 0) < 0) { + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") +} else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { Review comment: Yes. At the end of this function, the changelog properties if defined for the store in user-config are picked-up and populated in the resultant change-log-topic-properties. So this cleanup.policy check is still necessary. We could simplify the code, but I prefer not to couple clean-up with a critical-fix. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. URL: https://github.com/apache/samza/pull/1251#discussion_r366527827 ## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ## @@ -541,8 +541,8 @@ public KafkaStreamSpec toKafkaSpec(StreamSpec spec) { new KafkaStreamSpec(spec.getId(), spec.getPhysicalName(), systemName, 1, coordinatorStreamReplicationFactor, coordinatorStreamProperties); } else if (spec.isCheckpointStream()) { - kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), systemName)) - .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())); + kafkaSpec = KafkaStreamSpec.fromSpec(spec) Review comment: 1. Fixed the indentation. 2. Updated the checkpoint spec building to be consistent with `KafkaCheckpointManager`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. URL: https://github.com/apache/samza/pull/1251#discussion_r366528780 ## File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ## @@ -316,16 +316,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) val kafkaChangeLogProperties = new Properties -val appConfig = new ApplicationConfig(config) // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57, // 1.0.2, or 1.1.0 (see KAFKA-6568) // Adjust changelog topic setting, when TTL is set on a RocksDB store // - Disable log compaction on Kafka changelog topic // - Set topic TTL to be the same as RocksDB TTL -Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match { +val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name +Option(config.get(storeTTLkey)) match { case Some(rocksDbTtl) => -if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { +if (config.getInt(storeTTLkey, 0) < 0) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [samza] shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation.
shanthoosh commented on a change in pull request #1251: SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. URL: https://github.com/apache/samza/pull/1251#discussion_r366541989 ## File path: samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ## @@ -316,16 +316,18 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { val filteredConfigs = config.subset(KafkaConfig.CHANGELOG_STREAM_KAFKA_SETTINGS format name, true) val kafkaChangeLogProperties = new Properties -val appConfig = new ApplicationConfig(config) // SAMZA-1600: do not use the combination of "compact,delete" as cleanup policy until we pick up Kafka broker 0.11.0.57, // 1.0.2, or 1.1.0 (see KAFKA-6568) // Adjust changelog topic setting, when TTL is set on a RocksDB store // - Disable log compaction on Kafka changelog topic // - Set topic TTL to be the same as RocksDB TTL -Option(config.get("stores.%s.rocksdb.ttl.ms" format name)) match { +val storeTTLkey = "stores.%s.rocksdb.ttl.ms" format name +Option(config.get(storeTTLkey)) match { case Some(rocksDbTtl) => -if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { +if (config.getInt(storeTTLkey, 0) < 0) { + kafkaChangeLogProperties.setProperty("cleanup.policy", "compact") +} else if (!config.containsKey("stores.%s.changelog.kafka.cleanup.policy" format name)) { Review comment: Yes. At the end of this function, the changelog properties if defined for the store in user-config are picked-up and populated in the resultant change-log-topic-properties. So this additional cleanup.policy is still necessary. We could simplify the code, but I prefer not to couple clean-up with a critical-fix. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services