Repository: samza
Updated Branches:
  refs/heads/master 985822028 -> 317b6ff1b


SAMZA-1020: Remove methods in Kafka checkpoint classes that were deprecated in 
the 0.10 release.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/db31e899
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/db31e899
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/db31e899

Branch: refs/heads/master
Commit: db31e8992c7bcbe17a5ad85bbab6105e232a626a
Parents: 9858220
Author: vjagadish1989 <jvenk...@linkedin.com>
Authored: Thu Sep 29 11:48:22 2016 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Thu Sep 29 11:48:22 2016 -0700

----------------------------------------------------------------------
 .../kafka/KafkaCheckpointLogKey.scala           | 23 --------------------
 .../kafka/KafkaCheckpointManager.scala          | 22 -------------------
 .../kafka/TeskKafkaCheckpointLogKey.scala       | 10 ---------
 3 files changed, 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/db31e899/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
index ea8462d..9ed64c3 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
@@ -57,13 +57,6 @@ class KafkaCheckpointLogKey private (val map: Map[String, 
String]) {
    */
   def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
 
-  /**
-   * Is this key for a changelog partition mapping?
-   *
-   * @return true iff this key's entry is for a changelog partition mapping
-   */
-  @Deprecated
-  def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
 
   /**
    * If this Key is for a checkpoint entry, return its associated TaskName.
@@ -98,17 +91,9 @@ object KafkaCheckpointLogKey {
   val CHECKPOINT_KEY_KEY = "type"
   val CHECKPOINT_KEY_TYPE = "checkpoint"
 
-  @Deprecated
-  val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
-
   val CHECKPOINT_TASKNAME_KEY = "taskName"
   val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = 
"systemstreampartition-grouper-factory"
 
-  /**
-   * Partition mapping keys have no dynamic values, so we just need one 
instance.
-   */
-  @Deprecated
-  val CHANGELOG_PARTITION_MAPPING_KEY = new 
KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
 
   private val JSON_MAPPER = new ObjectMapper()
   val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
@@ -146,14 +131,6 @@ object KafkaCheckpointLogKey {
   }
 
   /**
-   * Build a key for a changelog partition mapping entry
-   *
-   * @return Key for changelog partition mapping entry
-   */
-  @Deprecated
-  def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
-
-  /**
    * Deserialize a Kafka checkpoint log key
    * @param bytes Serialized (via JSON) Kafka checkpoint log key
    * @return Checkpoint log key

http://git-wip-us.apache.org/repos/asf/samza/blob/db31e899/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index ea10cae..8f18a92 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -290,28 +290,6 @@ class KafkaCheckpointManager(
     }
   }
 
-
-  /**
-   * Read through entire log, discarding checkpoints, finding latest 
changelogPartitionMapping
-   * To be used for Migration purpose only. In newer version, 
changelogPartitionMapping will be handled through coordinator stream
-   */
-  @Deprecated
-  def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = 
{
-    var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new 
util.HashMap[TaskName, java.lang.Integer]()
-
-    def shouldHandleEntry(key: KafkaCheckpointLogKey) = 
key.isChangelogPartitionMapping
-
-    def handleCheckpoint(payload: ByteBuffer, 
checkpointKey:KafkaCheckpointLogKey): Unit = {
-      changelogPartitionMapping = 
serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
-
-      debug("Adding changelog partition mapping" + changelogPartitionMapping)
-    }
-
-    readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, 
handleCheckpoint)
-
-    changelogPartitionMapping
-  }
-
   override def toString = "KafkaCheckpointManager [systemName=%s, 
checkpointTopic=%s]" format(systemName, checkpointTopic)
 }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/db31e899/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
index c360b6c..38f3dc2 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TeskKafkaCheckpointLogKey.scala
@@ -40,16 +40,6 @@ class TestKafkaCheckpointLogKey {
   }
 
   @Test
-  def changelogPartitionMappingKeySerializationRoundTrip() {
-    val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
-    val asBytes = key.toBytes()
-    val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
-
-    assertEquals(key, backFromBytes)
-    assertNotSame(key, backFromBytes)
-  }
-
-  @Test
   def differingSSPGrouperFactoriesCauseException() {
 
     val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new 
TaskName("TN"))

Reply via email to