[FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ffe75a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ffe75a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ffe75a5

Branch: refs/heads/master
Commit: 8ffe75a54f24cbd8e69c455b42a4e969b943a279
Parents: df16e50
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Feb 22 15:04:46 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../runtime/checkpoint/CheckpointOptions.java   |  2 +-
 .../io/network/api/CheckpointBarrier.java       | 66 +++++++-------------
 .../io/network/api/CheckpointBarrierTest.java   | 40 ++++++------
 .../api/serialization/EventSerializerTest.java  | 13 ++--
 4 files changed, 46 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
index cb98d10..676cf3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -46,7 +46,7 @@ public class CheckpointOptions implements Serializable {
 
        private CheckpointOptions(
                        @Nonnull CheckpointType checkpointType,
-                       String targetLocation) {
+                       @Nullable  String targetLocation) {
                this.checkpointType = checkNotNull(checkpointType);
                this.targetLocation = targetLocation;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index a42c25d..97ad90f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,18 +18,14 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
-import org.apache.flink.util.StringUtils;
 
 /**
  * Checkpoint barriers are used to align checkpoints throughout the streaming 
topology. The
@@ -48,11 +44,9 @@ import org.apache.flink.util.StringUtils;
  */
 public class CheckpointBarrier extends RuntimeEvent {
 
-       private long id;
-       private long timestamp;
-       private CheckpointOptions checkpointOptions;
-
-       public CheckpointBarrier() {}
+       private final long id;
+       private final long timestamp;
+       private final CheckpointOptions checkpointOptions;
 
        public CheckpointBarrier(long id, long timestamp, CheckpointOptions 
checkpointOptions) {
                this.id = id;
@@ -75,66 +69,48 @@ public class CheckpointBarrier extends RuntimeEvent {
        // 
------------------------------------------------------------------------
        // Serialization
        // 
------------------------------------------------------------------------
-       
+
+       //
+       //  These methods are inherited form the generic serialization of 
AbstractEvent
+       //  but would require the CheckpointBarrier to be mutable. Since all 
serialization
+       //  for events goes through the EventSerializer class, which has 
special serialization
+       //  for the CheckpointBarrier, we don't need these methods
+       // 
+
        @Override
        public void write(DataOutputView out) throws IOException {
-               out.writeLong(id);
-               out.writeLong(timestamp);
-               CheckpointType checkpointType = 
checkpointOptions.getCheckpointType();
-
-               out.writeInt(checkpointType.ordinal());
-
-               if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
-                       return;
-               } else if (checkpointType == CheckpointType.SAVEPOINT) {
-                       String targetLocation = 
checkpointOptions.getTargetLocation();
-                       checkState(targetLocation != null);
-                       StringUtils.writeString(targetLocation, out);
-               } else {
-                       throw new IOException("Unknown CheckpointType " + 
checkpointType);
-               }
+               throw new UnsupportedOperationException("This method should 
never be called");
        }
 
        @Override
        public void read(DataInputView in) throws IOException {
-               id = in.readLong();
-               timestamp = in.readLong();
-
-               int typeOrdinal = in.readInt();
-               checkElementIndex(typeOrdinal, CheckpointType.values().length, 
"Unknown CheckpointType ordinal");
-               CheckpointType checkpointType = 
CheckpointType.values()[typeOrdinal];
-
-               if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
-                       checkpointOptions = 
CheckpointOptions.forFullCheckpoint();
-               } else if (checkpointType == CheckpointType.SAVEPOINT) {
-                       String targetLocation = StringUtils.readString(in);
-                       checkpointOptions = 
CheckpointOptions.forSavepoint(targetLocation);
-               } else {
-                       throw new IOException("Illegal CheckpointType " + 
checkpointType);
-               }
+               throw new UnsupportedOperationException("This method should 
never be called");
        }
 
-
        // 
------------------------------------------------------------------------
 
        @Override
        public int hashCode() {
-               return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
+               return (int) (id ^ (id >>> 32) ^ timestamp ^ (timestamp >>> 
32));
        }
 
        @Override
        public boolean equals(Object other) {
-               if (other == null || !(other instanceof CheckpointBarrier)) {
+               if (other == this) {
+                       return true;
+               }
+               else if (other == null || other.getClass() != 
CheckpointBarrier.class) {
                        return false;
                }
                else {
                        CheckpointBarrier that = (CheckpointBarrier) other;
-                       return that.id == this.id && that.timestamp == 
this.timestamp;
+                       return that.id == this.id && that.timestamp == 
this.timestamp &&
+                                       
this.checkpointOptions.equals(that.checkpointOptions);
                }
        }
 
        @Override
        public String toString() {
-               return String.format("CheckpointBarrier %d @ %d", id, 
timestamp);
+               return String.format("CheckpointBarrier %d @ %d Options: %s", 
id, timestamp, checkpointOptions);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index ad9fc16..ba833c3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -18,44 +18,40 @@
 
 package org.apache.flink.runtime.io.network.api;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.util.DataInputDeserializer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
+
 import org.junit.Test;
 
+import static org.junit.Assert.fail;
+
 public class CheckpointBarrierTest {
 
        /**
         * Test serialization of the checkpoint barrier.
+        * The checkpoint barrier does not support its own serialization, in 
order to be immutable.
         */
        @Test
        public void testSerialization() throws Exception {
                long id = Integer.MAX_VALUE + 123123L;
                long timestamp = Integer.MAX_VALUE + 1228L;
 
-               CheckpointOptions checkpoint = 
CheckpointOptions.forFullCheckpoint();
-               testSerialization(id, timestamp, checkpoint);
-
-               CheckpointOptions savepoint = 
CheckpointOptions.forSavepoint("1289031838919123");
-               testSerialization(id, timestamp, savepoint);
-       }
-
-       private void testSerialization(long id, long timestamp, 
CheckpointOptions options) throws IOException {
+               CheckpointOptions options = 
CheckpointOptions.forFullCheckpoint();
                CheckpointBarrier barrier = new CheckpointBarrier(id, 
timestamp, options);
 
-               DataOutputSerializer out = new DataOutputSerializer(1024);
-               barrier.write(out);
-
-               DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
-               CheckpointBarrier deserialized = new CheckpointBarrier();
-               deserialized.read(in);
-
-               assertEquals(id, deserialized.getId());
-               assertEquals(timestamp, deserialized.getTimestamp());
-               assertEquals(options.getCheckpointType(), 
deserialized.getCheckpointOptions().getCheckpointType());
-               assertEquals(options.getTargetLocation(), 
deserialized.getCheckpointOptions().getTargetLocation());
+               try {
+                       barrier.write(new DataOutputSerializer(1024));
+                       fail("should throw an exception");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
+
+               try {
+                       barrier.read(new DataInputDeserializer(new byte[32]));
+                       fail("should throw an exception");
+               } catch (UnsupportedOperationException e) {
+                       // expected
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index e674eb7..f51b083 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -147,15 +147,14 @@ public class EventSerializerTest {
         *              thinks the encoded buffer matches the class
         * @throws IOException
         */
-       private final boolean checkIsEvent(final AbstractEvent event,
-               final Class<? extends AbstractEvent> eventClass) throws
-               IOException {
-               final Buffer serializedEvent =
-                       EventSerializer.toBuffer(event);
+       private boolean checkIsEvent(
+                       AbstractEvent event, 
+                       Class<? extends AbstractEvent> eventClass) throws 
IOException {
+
+               final Buffer serializedEvent = EventSerializer.toBuffer(event);
                try {
                        final ClassLoader cl = getClass().getClassLoader();
-                       return EventSerializer
-                               .isEvent(serializedEvent, eventClass, cl);
+                       return EventSerializer.isEvent(serializedEvent, 
eventClass, cl);
                } finally {
                        serializedEvent.recycle();
                }

Reply via email to