[FLINK-5887] [checkpointing] Make CheckpointBarrier type immutable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ffe75a5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ffe75a5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ffe75a5 Branch: refs/heads/master Commit: 8ffe75a54f24cbd8e69c455b42a4e969b943a279 Parents: df16e50 Author: Stephan Ewen <se...@apache.org> Authored: Wed Feb 22 15:04:46 2017 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Feb 23 18:39:49 2017 +0100 ---------------------------------------------------------------------- .../runtime/checkpoint/CheckpointOptions.java | 2 +- .../io/network/api/CheckpointBarrier.java | 66 +++++++------------- .../io/network/api/CheckpointBarrierTest.java | 40 ++++++------ .../api/serialization/EventSerializerTest.java | 13 ++-- 4 files changed, 46 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java index cb98d10..676cf3b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java @@ -46,7 +46,7 @@ public class CheckpointOptions implements Serializable { private CheckpointOptions( @Nonnull CheckpointType checkpointType, - String targetLocation) { + @Nullable String targetLocation) { this.checkpointType = checkNotNull(checkpointType); this.targetLocation = targetLocation; } http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java index a42c25d..97ad90f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java @@ -18,18 +18,14 @@ package org.apache.flink.runtime.io.network.api; -import static org.apache.flink.util.Preconditions.checkElementIndex; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; import java.io.IOException; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; import org.apache.flink.runtime.event.RuntimeEvent; -import org.apache.flink.util.StringUtils; /** * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The @@ -48,11 +44,9 @@ import org.apache.flink.util.StringUtils; */ public class CheckpointBarrier extends RuntimeEvent { - private long id; - private long timestamp; - private CheckpointOptions checkpointOptions; - - public CheckpointBarrier() {} + private final long id; + private final long timestamp; + private final CheckpointOptions checkpointOptions; public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) { this.id = id; @@ -75,66 +69,48 @@ public class CheckpointBarrier extends RuntimeEvent { // ------------------------------------------------------------------------ // Serialization // ------------------------------------------------------------------------ - + + // + // These methods are inherited form the generic serialization of AbstractEvent + // but would require the CheckpointBarrier to be mutable. Since all serialization + // for events goes through the EventSerializer class, which has special serialization + // for the CheckpointBarrier, we don't need these methods + // + @Override public void write(DataOutputView out) throws IOException { - out.writeLong(id); - out.writeLong(timestamp); - CheckpointType checkpointType = checkpointOptions.getCheckpointType(); - - out.writeInt(checkpointType.ordinal()); - - if (checkpointType == CheckpointType.FULL_CHECKPOINT) { - return; - } else if (checkpointType == CheckpointType.SAVEPOINT) { - String targetLocation = checkpointOptions.getTargetLocation(); - checkState(targetLocation != null); - StringUtils.writeString(targetLocation, out); - } else { - throw new IOException("Unknown CheckpointType " + checkpointType); - } + throw new UnsupportedOperationException("This method should never be called"); } @Override public void read(DataInputView in) throws IOException { - id = in.readLong(); - timestamp = in.readLong(); - - int typeOrdinal = in.readInt(); - checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal"); - CheckpointType checkpointType = CheckpointType.values()[typeOrdinal]; - - if (checkpointType == CheckpointType.FULL_CHECKPOINT) { - checkpointOptions = CheckpointOptions.forFullCheckpoint(); - } else if (checkpointType == CheckpointType.SAVEPOINT) { - String targetLocation = StringUtils.readString(in); - checkpointOptions = CheckpointOptions.forSavepoint(targetLocation); - } else { - throw new IOException("Illegal CheckpointType " + checkpointType); - } + throw new UnsupportedOperationException("This method should never be called"); } - // ------------------------------------------------------------------------ @Override public int hashCode() { - return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32)); + return (int) (id ^ (id >>> 32) ^ timestamp ^ (timestamp >>> 32)); } @Override public boolean equals(Object other) { - if (other == null || !(other instanceof CheckpointBarrier)) { + if (other == this) { + return true; + } + else if (other == null || other.getClass() != CheckpointBarrier.class) { return false; } else { CheckpointBarrier that = (CheckpointBarrier) other; - return that.id == this.id && that.timestamp == this.timestamp; + return that.id == this.id && that.timestamp == this.timestamp && + this.checkpointOptions.equals(that.checkpointOptions); } } @Override public String toString() { - return String.format("CheckpointBarrier %d @ %d", id, timestamp); + return String.format("CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointOptions); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java index ad9fc16..ba833c3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java @@ -18,44 +18,40 @@ package org.apache.flink.runtime.io.network.api; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.util.DataInputDeserializer; import org.apache.flink.runtime.util.DataOutputSerializer; + import org.junit.Test; +import static org.junit.Assert.fail; + public class CheckpointBarrierTest { /** * Test serialization of the checkpoint barrier. + * The checkpoint barrier does not support its own serialization, in order to be immutable. */ @Test public void testSerialization() throws Exception { long id = Integer.MAX_VALUE + 123123L; long timestamp = Integer.MAX_VALUE + 1228L; - CheckpointOptions checkpoint = CheckpointOptions.forFullCheckpoint(); - testSerialization(id, timestamp, checkpoint); - - CheckpointOptions savepoint = CheckpointOptions.forSavepoint("1289031838919123"); - testSerialization(id, timestamp, savepoint); - } - - private void testSerialization(long id, long timestamp, CheckpointOptions options) throws IOException { + CheckpointOptions options = CheckpointOptions.forFullCheckpoint(); CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, options); - DataOutputSerializer out = new DataOutputSerializer(1024); - barrier.write(out); - - DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); - CheckpointBarrier deserialized = new CheckpointBarrier(); - deserialized.read(in); - - assertEquals(id, deserialized.getId()); - assertEquals(timestamp, deserialized.getTimestamp()); - assertEquals(options.getCheckpointType(), deserialized.getCheckpointOptions().getCheckpointType()); - assertEquals(options.getTargetLocation(), deserialized.getCheckpointOptions().getTargetLocation()); + try { + barrier.write(new DataOutputSerializer(1024)); + fail("should throw an exception"); + } catch (UnsupportedOperationException e) { + // expected + } + + try { + barrier.read(new DataInputDeserializer(new byte[32])); + fail("should throw an exception"); + } catch (UnsupportedOperationException e) { + // expected + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8ffe75a5/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java index e674eb7..f51b083 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java @@ -147,15 +147,14 @@ public class EventSerializerTest { * thinks the encoded buffer matches the class * @throws IOException */ - private final boolean checkIsEvent(final AbstractEvent event, - final Class<? extends AbstractEvent> eventClass) throws - IOException { - final Buffer serializedEvent = - EventSerializer.toBuffer(event); + private boolean checkIsEvent( + AbstractEvent event, + Class<? extends AbstractEvent> eventClass) throws IOException { + + final Buffer serializedEvent = EventSerializer.toBuffer(event); try { final ClassLoader cl = getClass().getClassLoader(); - return EventSerializer - .isEvent(serializedEvent, eventClass, cl); + return EventSerializer.isEvent(serializedEvent, eventClass, cl); } finally { serializedEvent.recycle(); }