[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.

2017-05-17 Thread kkloudas
[FLINK-6604] [cep] Remove java serialization from the library.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d80af819
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d80af819
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d80af819

Branch: refs/heads/release-1.3
Commit: d80af81972ba5adf291d891881aee26b97ec7a60
Parents: 34a6020
Author: kkloudas 
Authored: Tue May 16 17:07:29 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:40:27 2017 +0200

--
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |  98 ++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 596 ++-
 .../java/org/apache/flink/cep/nfa/State.java|   2 +
 .../flink/cep/nfa/compiler/NFACompiler.java |  81 ++-
 .../AbstractKeyedCEPPatternOperator.java|  16 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 182 --
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  14 +-
 .../cep/operator/CEPFrom12MigrationTest.java|  99 ++-
 .../cep/operator/CEPMigration11to13Test.java| 102 +++-
 .../flink/cep/operator/CEPOperatorTest.java | 110 +++-
 11 files changed, 1499 insertions(+), 318 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d80af819/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index fd3fafa..3827956 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
@@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable {
deweyNumber = new int[]{start};
}
 
-   protected DeweyNumber(int[] deweyNumber) {
-   this.deweyNumber = deweyNumber;
-   }
-
public DeweyNumber(DeweyNumber number) {
this.deweyNumber = Arrays.copyOf(number.deweyNumber, 
number.deweyNumber.length);
}
 
+   private DeweyNumber(int[] deweyNumber) {
+   this.deweyNumber = deweyNumber;
+   }
+
/**
 * Checks whether this dewey number is compatible to the other dewey 
number.
 *
@@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable {
return new DeweyNumber(deweyNumber);
}
}
+
+   /**
+* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as 
a version number.
+*/
+   public static class DeweyNumberSerializer extends 
TypeSerializerSingleton {
+
+   private static final long serialVersionUID = 
-5086792497034943656L;
+
+   private final IntSerializer elemSerializer = 
IntSerializer.INSTANCE;
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public DeweyNumber createInstance() {
+   return new DeweyNumber(1);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from) {
+   return new DeweyNumber(from);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) {
+   return copy(from);
+   }
+
+   @Override
+   public int getLength() {
+   return -1;
+   }
+
+   @Override
+   public void serialize(DeweyNumber record, DataOutputView 
target) throws IOException {
+   final int size = record.length();
+   target.writeInt(size);
+   for (int i = 0; i < size; i++) {
+   elemSerializer.serialize(record.deweyNumber[i], 
target);
+   }
+   }
+
+   @Override
+   public DeweyNumber deserialize(DataInputView source) throws 
IOException {
+   final 

[8/9] flink git commit: [FLINK-6604] [cep] Remove java serialization from the library.

2017-05-17 Thread kkloudas
[FLINK-6604] [cep] Remove java serialization from the library.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a54d05e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a54d05e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a54d05e

Branch: refs/heads/master
Commit: 7a54d05ecd33b6dc140a7146f0efa90d64471f47
Parents: f7ebcb0
Author: kkloudas 
Authored: Tue May 16 17:07:29 2017 +0200
Committer: kkloudas 
Committed: Wed May 17 14:37:34 2017 +0200

--
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |  98 ++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 517 ++--
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 596 ++-
 .../java/org/apache/flink/cep/nfa/State.java|   2 +
 .../flink/cep/nfa/compiler/NFACompiler.java |  81 ++-
 .../AbstractKeyedCEPPatternOperator.java|  16 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  | 182 --
 .../apache/flink/cep/nfa/SharedBufferTest.java  |  14 +-
 .../cep/operator/CEPFrom12MigrationTest.java|  99 ++-
 .../cep/operator/CEPMigration11to13Test.java| 102 +++-
 .../flink/cep/operator/CEPOperatorTest.java | 110 +++-
 11 files changed, 1499 insertions(+), 318 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7a54d05e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index fd3fafa..3827956 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.cep.nfa;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
 
@@ -40,14 +47,14 @@ public class DeweyNumber implements Serializable {
deweyNumber = new int[]{start};
}
 
-   protected DeweyNumber(int[] deweyNumber) {
-   this.deweyNumber = deweyNumber;
-   }
-
public DeweyNumber(DeweyNumber number) {
this.deweyNumber = Arrays.copyOf(number.deweyNumber, 
number.deweyNumber.length);
}
 
+   private DeweyNumber(int[] deweyNumber) {
+   this.deweyNumber = deweyNumber;
+   }
+
/**
 * Checks whether this dewey number is compatible to the other dewey 
number.
 *
@@ -175,4 +182,87 @@ public class DeweyNumber implements Serializable {
return new DeweyNumber(deweyNumber);
}
}
+
+   /**
+* A {@link TypeSerializer} for the {@link DeweyNumber} which serves as 
a version number.
+*/
+   public static class DeweyNumberSerializer extends 
TypeSerializerSingleton {
+
+   private static final long serialVersionUID = 
-5086792497034943656L;
+
+   private final IntSerializer elemSerializer = 
IntSerializer.INSTANCE;
+
+   @Override
+   public boolean isImmutableType() {
+   return false;
+   }
+
+   @Override
+   public DeweyNumber createInstance() {
+   return new DeweyNumber(1);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from) {
+   return new DeweyNumber(from);
+   }
+
+   @Override
+   public DeweyNumber copy(DeweyNumber from, DeweyNumber reuse) {
+   return copy(from);
+   }
+
+   @Override
+   public int getLength() {
+   return -1;
+   }
+
+   @Override
+   public void serialize(DeweyNumber record, DataOutputView 
target) throws IOException {
+   final int size = record.length();
+   target.writeInt(size);
+   for (int i = 0; i < size; i++) {
+   elemSerializer.serialize(record.deweyNumber[i], 
target);
+   }
+   }
+
+   @Override
+   public DeweyNumber deserialize(DataInputView source) throws 
IOException {
+   final int