[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.
[FLINK-6604] [cep] Remove java serialization from the library. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80af819 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80af819 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80af819 Branch: refs/heads/release-1.3 Commit: d80af81972ba5adf291d891881aee26b97ec7a60 Parents: 34a6020 Author: kkloudasAuthored: Tue May 16 17:07:29 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:40:27 2017 +0200 -- .../org/apache/flink/cep/nfa/DeweyNumber.java | 98 ++- .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++-- .../org/apache/flink/cep/nfa/SharedBuffer.java | 596 ++- .../java/org/apache/flink/cep/nfa/State.java| 2 + .../flink/cep/nfa/compiler/NFACompiler.java | 81 ++- .../AbstractKeyedCEPPatternOperator.java| 16 +- .../java/org/apache/flink/cep/nfa/NFATest.java | 182 -- .../apache/flink/cep/nfa/SharedBufferTest.java | 14 +- .../cep/operator/CEPFrom12MigrationTest.java| 99 ++- .../cep/operator/CEPMigration11to13Test.java| 102 +++- .../flink/cep/operator/CEPOperatorTest.java | 110 +++- 11 files changed, 1499 insertions(+), 318 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java index fd3fafa..3827956 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -18,6 +18,13 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; @@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable { deweyNumber = new int[]{start}; } - protected DeweyNumber(int[] deweyNumber) { - this.deweyNumber = deweyNumber; - } - public DeweyNumber(DeweyNumber number) { this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length); } + private DeweyNumber(int[] deweyNumber) { + this.deweyNumber = deweyNumber; + } + /** * Checks whether this dewey number is compatible to the other dewey number. * @@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable { return new DeweyNumber(deweyNumber); } } + + /** +* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as a version number. +*/ + public static class DeweyNumberSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = -5086792497034943656L; + + private final IntSerializer elemSerializer = IntSerializer.INSTANCE; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public DeweyNumber createInstance() { + return new DeweyNumber(1); + } + + @Override + public DeweyNumber copy(DeweyNumber from) { + return new DeweyNumber(from); + } + + @Override + public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(DeweyNumber record, DataOutputView target) throws IOException { + final int size = record.length(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elemSerializer.serialize(record.deweyNumber[i], target); + } + } + + @Override + public DeweyNumber deserialize(DataInputView source) throws IOException { + final
[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.
[FLINK-6604] [cep] Remove java serialization from the library. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a54d05e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a54d05e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a54d05e Branch: refs/heads/master Commit: 7a54d05ecd33b6dc140a7146f0efa90d64471f47 Parents: f7ebcb0 Author: kkloudasAuthored: Tue May 16 17:07:29 2017 +0200 Committer: kkloudas Committed: Wed May 17 14:37:34 2017 +0200 -- .../org/apache/flink/cep/nfa/DeweyNumber.java | 98 ++- .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++-- .../org/apache/flink/cep/nfa/SharedBuffer.java | 596 ++- .../java/org/apache/flink/cep/nfa/State.java| 2 + .../flink/cep/nfa/compiler/NFACompiler.java | 81 ++- .../AbstractKeyedCEPPatternOperator.java| 16 +- .../java/org/apache/flink/cep/nfa/NFATest.java | 182 -- .../apache/flink/cep/nfa/SharedBufferTest.java | 14 +- .../cep/operator/CEPFrom12MigrationTest.java| 99 ++- .../cep/operator/CEPMigration11to13Test.java| 102 +++- .../flink/cep/operator/CEPOperatorTest.java | 110 +++- 11 files changed, 1499 insertions(+), 318 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java index fd3fafa..3827956 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java @@ -18,6 +18,13 @@ package org.apache.flink.cep.nfa; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; @@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable { deweyNumber = new int[]{start}; } - protected DeweyNumber(int[] deweyNumber) { - this.deweyNumber = deweyNumber; - } - public DeweyNumber(DeweyNumber number) { this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length); } + private DeweyNumber(int[] deweyNumber) { + this.deweyNumber = deweyNumber; + } + /** * Checks whether this dewey number is compatible to the other dewey number. * @@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable { return new DeweyNumber(deweyNumber); } } + + /** +* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as a version number. +*/ + public static class DeweyNumberSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = -5086792497034943656L; + + private final IntSerializer elemSerializer = IntSerializer.INSTANCE; + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public DeweyNumber createInstance() { + return new DeweyNumber(1); + } + + @Override + public DeweyNumber copy(DeweyNumber from) { + return new DeweyNumber(from); + } + + @Override + public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(DeweyNumber record, DataOutputView target) throws IOException { + final int size = record.length(); + target.writeInt(size); + for (int i = 0; i < size; i++) { + elemSerializer.serialize(record.deweyNumber[i], target); + } + } + + @Override + public DeweyNumber deserialize(DataInputView source) throws IOException { + final int