[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3117


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96651445
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -96,18 +98,19 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
 
/**
 * Sets an additional, user provided hash for this operator.
-*
 * 
 * The user provided hash is an alternative to the generated hashes, 
that is considered when identifying an
 * operator through the default hash mechanics fails (e.g. because of 
changes between Flink versions.
--- End diff --

Ok, I misunderstood what closing bracket we were talking about :-). Fixed 
those typos.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96640105
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
 ---
@@ -18,27 +18,29 @@
 
 package org.apache.flink.streaming.api.graph;
 
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.util.StringUtils;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * StreamGraphHasher that works with user provided hashes.
+ * StreamGraphHasher that works with user provided hashes. This us useful 
in case we want to set (alternative) hashes
--- End diff --

This us useful -> This is useful


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96640591
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -96,18 +98,19 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
 
/**
 * Sets an additional, user provided hash for this operator.
-*
 * 
 * The user provided hash is an alternative to the generated hashes, 
that is considered when identifying an
 * operator through the default hash mechanics fails (e.g. because of 
changes between Flink versions.
--- End diff --

Still missing closing parenthesis (also in other copies of this javadoc)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-18 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96630784
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
+*
+* 
+* The user provided hash is an alternative to the generated hashes, 
that is considered when identifying an
+* operator through the default hash mechanics fails (e.g. because of 
changes between Flink versions.
+*
+* Important: this hash needs to be unique per 
transformation and job. Otherwise, job
--- End diff --

If it is not true, then how can state be assigned without ambiguities if 
the hash is used as basis for JobVertexID?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96227360
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
 ---
@@ -87,6 +87,26 @@ public String getName() {
return this;
}
 
+
+   /**
+* Sets an additional, user provided hash for this operator.
--- End diff --

Above comments apply to these as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96226989
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
+*
+* 
--- End diff --

Not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96229201
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamTransformation.java
 ---
@@ -205,6 +209,31 @@ public void setMaxParallelism(int maxParallelism) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
--- End diff --

Above comments apply to these as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96227219
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
+*
+* 
+* The user provided hash is an alternative to the generated hashes, 
that is considered when identifying an
+* operator through the default hash mechanics fails (e.g. because of 
changes between Flink versions.
+*
+* Important: this hash needs to be unique per 
transformation and job. Otherwise, job
+* submission will fail.
+*
+* @param hash the user provided hash for this operator.
+* @return The operator with the user provided hash.
+*/
+   public CassandraSink provideAdditionalNodeHash(String hash) {
--- End diff --

I think the `provide` prefix gives the impression that it is possible to 
provide multiple additional node hashes. I would replcae this with `set`. What 
do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96227716
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamGraphHasher that works with user provided hashes.
--- End diff --

I think we should provide a more detailed description about why we need 
this etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96227773
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamGraphHasher that works with user provided hashes.
+ */
+public class StreamGraphUserHashHasher implements StreamGraphHasher {
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   HashMap hashResult = new HashMap<>();
+   for (StreamNode streamNode : streamGraph.getStreamNodes()) {
+   String userHash = streamNode.getUserHash();
+   if (null != userHash) {
+
--- End diff --

empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96227285
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSink.java
 ---
@@ -77,6 +77,25 @@ protected DataStreamSink(DataStream inputStream, 
StreamSink operator) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
--- End diff --

Above comments apply to these as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96229428
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
+*
+* 
+* The user provided hash is an alternative to the generated hashes, 
that is considered when identifying an
+* operator through the default hash mechanics fails (e.g. because of 
changes between Flink versions.
+*
+* Important: this hash needs to be unique per 
transformation and job. Otherwise, job
+* submission will fail.
+*
+* @param hash the user provided hash for this operator.
+* @return The operator with the user provided hash.
+*/
--- End diff --

We don't annotate classes outside of some projects, but should we add the 
`@PublicEvolving` annotation here as well (also missing for `uid` in this 
class).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96229220
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
 ---
@@ -201,6 +201,28 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
+* Sets an additional, user provided hash for this operator.
--- End diff --

Above comments apply to these as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96228294
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 ---
@@ -272,12 +273,20 @@ public void setStateKeySerializer(TypeSerializer 
stateKeySerializer) {
this.stateKeySerializer = stateKeySerializer;
}
 
-   public String getTransformationId() {
-   return transformationId;
+   public String getTransformationUID() {
+   return transformationUID;
}
 
-   void setTransformationId(String transformationId) {
-   this.transformationId = transformationId;
+   void setTransformationUID(String transformationId) {
+   this.transformationUID = transformationId;
+   }
+
+   public String getUserHash() {
+   return userHash;
+   }
+
+   public void setUserHash(String userHash) {
+   this.userHash = userHash;
--- End diff --

Since this is executed on the client before submitting the job it might be 
helpful to do some early sanity checking here. The expected String is a hex 
representation of a JobVertexID


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96227062
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
+*
+* 
+* The user provided hash is an alternative to the generated hashes, 
that is considered when identifying an
+* operator through the default hash mechanics fails (e.g. because of 
changes between Flink versions.
+*
+* Important: this hash needs to be unique per 
transformation and job. Otherwise, job
--- End diff --

I think this is not true for the additional hash?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96227741
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamGraphHasher that works with user provided hashes.
+ */
+public class StreamGraphUserHashHasher implements StreamGraphHasher {
+   @Override
--- End diff --

empty lines above missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96222946
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
 ---
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * StreamGraphHasher that works with user provided hashes.
+ */
+public class StreamGraphUserHashHasher implements StreamGraphHasher {
+   @Override
+   public Map 
traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
+   HashMap hashResult = new HashMap<>();
+   for (StreamNode streamNode : streamGraph.getStreamNodes()) {
+   String userHash = streamNode.getUserHash();
+   if (null != userHash) {
+
+   for (StreamEdge inEdge : 
streamNode.getInEdges()) {
+   if (isChainable(inEdge, 
streamGraph.isChainingEnabled())) {
+   throw new 
UnsupportedOperationException("Cannot assign user-specified hash "
+   + "to 
intermediate node in chain. This will be supported in future "
+   + "versions of 
Flink. As a work around start new chain at task "
+   + 
streamNode.getOperatorName() + ".");
+   }
+   }
+
+   hashResult.put(streamNode.getId(), 
StringUtils.hexStringToByte(userHash));
+   }
+   }
+
+   return hashResult;
+   }
+
+   private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) 
{
--- End diff --

Can't we reuse the ```isChainable``` method in the 
```StreamJobGraphGenerator``` instead? Otherwise we risk these conditions going 
out of sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96222455
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
 ---
@@ -422,6 +425,43 @@ public void 
testManualHashAssignmentForStartNodeInInChain() throws Exception {
env.getStreamGraph().getJobGraph();
}
 
+   @Test
+   public void testUserProvidedHashing() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+
+   List userHashes = 
Arrays.asList("", 
"");
+
+   env.addSource(new NoOpSourceFunction(), 
"src").provideAdditionalNodeHash(userHashes.get(0))
+   .map(new NoOpMapFunction())
+   .filter(new NoOpFilterFunction())
+   .keyBy(new NoOpKeySelector())
+   .reduce(new 
NoOpReduceFunction()).name("reduce").provideAdditionalNodeHash(userHashes.get(1));
+
+   StreamGraph streamGraph = env.getStreamGraph();
+   int idx = 1;
+   for (JobVertex jobVertex : 
streamGraph.getJobGraph().getVertices()) {
+   
Assert.assertEquals(jobVertex.getIdAlternatives().get(1).toString(), 
userHashes.get(idx));
+   --idx;
+   }
+   }
+
+   @Test
+   public void testUserProvidedHashingOnChainNotSupported() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+
+   env.addSource(new NoOpSourceFunction(), 
"src").provideAdditionalNodeHash("")
+   .map(new 
NoOpMapFunction()).provideAdditionalNodeHash("")
--- End diff --

the fact that this fails the job should probably be documented in the 
javadocs of ```provideAdditionalNodeHash```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/3117#discussion_r96220999
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -95,6 +95,28 @@ private CassandraSink(SingleOutputStreamOperator 
sink) {
}
 
/**
+* Sets an additional, user provided hash for this operator.
+*
+* 
+* The user provided hash is an alternative to the generated hashes, 
that is considered when identifying an
+* operator through the default hash mechanics fails (e.g. because of 
changes between Flink versions.
--- End diff --

missing closing brackets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3117: [FLINK-5480] Introduce user-provided hash for JobV...

2017-01-13 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/3117

[FLINK-5480] Introduce user-provided hash for JobVertexes

This PR allows users to provided (alternative) hashes for operators in a 
StreamGraph. This can make migration between Flink versions easier, in case the 
automatically produced hashes between versions are incompatible. For example, 
users could just copy the old hashes from the web ui to their job.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink UserProvidedHash

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3117.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3117


commit 96ef2c041bfe33462600d72b8ec1472f53c852f2
Author: Stefan Richter 
Date:   2017-01-12T17:57:52Z

[FLINK-5480] Introduce user-provided hash for JobVertexes




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---