Repository: samza Updated Branches: refs/heads/master 985822028 -> 317b6ff1b
SAMZA-1020: Remove methods in Kafka checkpoint classes that were deprecated in the 0.10 release. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/db31e899 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/db31e899 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/db31e899 Branch: refs/heads/master Commit: db31e8992c7bcbe17a5ad85bbab6105e232a626a Parents: 9858220 Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Thu Sep 29 11:48:22 2016 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Thu Sep 29 11:48:22 2016 -0700 ---------------------------------------------------------------------- .../kafka/KafkaCheckpointLogKey.scala | 23 -------------------- .../kafka/KafkaCheckpointManager.scala | 22 ------------------- .../kafka/TeskKafkaCheckpointLogKey.scala | 10 --------- 3 files changed, 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/db31e899/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala index ea8462d..9ed64c3 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala @@ -57,13 +57,6 @@ class KafkaCheckpointLogKey private (val map: Map[String, String]) { */ def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE) - /** - * Is this key for a changelog partition mapping? - * - * @return true iff this key's entry is for a changelog partition mapping - */ - @Deprecated - def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE) /** * If this Key is for a checkpoint entry, return its associated TaskName. @@ -98,17 +91,9 @@ object KafkaCheckpointLogKey { val CHECKPOINT_KEY_KEY = "type" val CHECKPOINT_KEY_TYPE = "checkpoint" - @Deprecated - val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping" - val CHECKPOINT_TASKNAME_KEY = "taskName" val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory" - /** - * Partition mapping keys have no dynamic values, so we just need one instance. - */ - @Deprecated - val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE)) private val JSON_MAPPER = new ObjectMapper() val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {} @@ -146,14 +131,6 @@ object KafkaCheckpointLogKey { } /** - * Build a key for a changelog partition mapping entry - * - * @return Key for changelog partition mapping entry - */ - @Deprecated - def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY - - /** * Deserialize a Kafka checkpoint log key * @param bytes Serialized (via JSON) Kafka checkpoint log key * @return Checkpoint log key http://git-wip-us.apache.org/repos/asf/samza/blob/db31e899/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index ea10cae..8f18a92 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -290,28 +290,6 @@ class KafkaCheckpointManager( } } - - /** - * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping - * To be used for Migration purpose only. In newer version, changelogPartitionMapping will be handled through coordinator stream - */ - @Deprecated - def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = { - var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]() - - def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping - - def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = { - changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload)) - - debug("Adding changelog partition mapping" + changelogPartitionMapping) - } - - readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint) - - changelogPartitionMapping - } - override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic) } http://git-wip-us.apache.org/repos/asf/samza/blob/db31e899/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala index c360b6c..38f3dc2 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala @@ -40,16 +40,6 @@ class TestKafkaCheckpointLogKey { } @Test - def changelogPartitionMappingKeySerializationRoundTrip() { - val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey() - val asBytes = key.toBytes() - val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes) - - assertEquals(key, backFromBytes) - assertNotSame(key, backFromBytes) - } - - @Test def differingSSPGrouperFactoriesCauseException() { val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))