[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()

2016-05-19 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3222:
--
Description: 
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.

The shift amount is greater than 31 bits.

  was:
Here is related code:
{code}
result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
>>> 32));
{code}
subTaskStats.length is an int.


The shift amount is greater than 31 bits.


> Incorrect shift amount in OperatorCheckpointStats#hashCode()
> 
>
> Key: FLINK-3222
> URL: https://issues.apache.org/jira/browse/FLINK-3222
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> Here is related code:
> {code}
> result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length 
> >>> 32));
> {code}
> subTaskStats.length is an int.
> The shift amount is greater than 31 bits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()

2016-05-19 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3734:
--
Description: 
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}

DataInputView in is not closed upon return.

  was:
{code}
DataInputView in = inputState.getState(getUserCodeClassloader());

final long nextEvaluationTime = in.readLong();
final long nextSlideTime = in.readLong();

AbstractKeyedTimePanes panes = 
createPanes(keySelector, function);
panes.readFromInput(in, keySerializer, stateTypeSerializer);

restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
  }
{code}
DataInputView in is not closed upon return.


> Unclosed DataInputView in 
> AbstractAlignedProcessingTimeWindowOperator#restoreState()
> 
>
> Key: FLINK-3734
> URL: https://issues.apache.org/jira/browse/FLINK-3734
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Priority: Minor
>
> {code}
> DataInputView in = inputState.getState(getUserCodeClassloader());
> final long nextEvaluationTime = in.readLong();
> final long nextSlideTime = in.readLong();
> AbstractKeyedTimePanes panes = 
> createPanes(keySelector, function);
> panes.readFromInput(in, keySerializer, stateTypeSerializer);
> restoredState = new RestoredState<>(panes, nextEvaluationTime, 
> nextSlideTime);
>   }
> {code}
> DataInputView in is not closed upon return.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3939) Prevent distinct aggregates and grouping sets from being translated

2016-05-19 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3939:


 Summary: Prevent distinct aggregates and grouping sets from being 
translated
 Key: FLINK-3939
 URL: https://issues.apache.org/jira/browse/FLINK-3939
 Project: Flink
  Issue Type: Bug
  Components: Table API
Reporter: Fabian Hueske
Assignee: Fabian Hueske
 Fix For: 1.1.0


Flink's SQL interface is currently not capable of executing distinct aggregates 
and grouping sets.

We need to prevent that queries with these operations are translated by 
adapting the DataSetAggregateRule.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3

2016-05-19 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-3801:
--
Description: 
Currently yoda-time 2.5 is used which was very old.

We should upgrade to 2.9.3

  was:
Currently yoda-time 2.5 is used which was very old.


We should upgrade to 2.9.3


> Upgrade Joda-Time library to 2.9.3
> --
>
> Key: FLINK-3801
> URL: https://issues.apache.org/jira/browse/FLINK-3801
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> Currently yoda-time 2.5 is used which was very old.
> We should upgrade to 2.9.3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [script] Simple fix of typo from JobManager to...

2016-05-19 Thread hsaputra
Github user hsaputra commented on the pull request:

https://github.com/apache/flink/pull/2001#issuecomment-220454866
  
Thanks @StephanEwen , have merged the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15292153#comment-15292153
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63956013
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
--- End diff --

I don't know that 64 is the best value but I think it is close. I clarified 
the javadoc to suggest that users not change this value.


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63956013
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
--- End diff --

I don't know that 64 is the best value but I think it is close. I clarified 
the javadoc to suggest that users not change this value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63951105
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example generates an undirected RMat graph with the given scale and
+ * edge factor then calculates all non-zero Jaccard Index similarity scores
+ * between vertices.
+ *
+ * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ */
+public class JaccardIndex {
+
+   public static final int DEFAULT_SCALE = 10;
+
+   public static final int DEFAULT_EDGE_FACTOR = 16;
+
+   public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+   public static void main(String[] args) throws Exception {
+   // Set up the execution environment
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().enableObjectReuse();
+
+   ParameterTool parameters = ParameterTool.fromArgs(args);
+
+   // Generate RMat graph
+   int scale = parameters.getInt("scale", DEFAULT_SCALE);
+   int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
+
+   RandomGenerableFactory rnd = new 
JDKRandomGeneratorFactory();
+
+   long vertexCount = 1L << scale;
+   long edgeCount = vertexCount * edgeFactor;
+
+   boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
+
+   Graph graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+   .setSimpleGraph(true, clipAndFlip)
+   .generate();
+
+   DataSet js;
+
+   if (scale > 32) {
+   js = graph
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   } else {
+   js = graph
+   .run(new TranslateGraphIds(new LongValueToIntValue()))
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   }
+
+   switch (parameters.get("output", "")) {
--- End diff --

Is the usage statement sufficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15292103#comment-15292103
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63951105
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example generates an undirected RMat graph with the given scale and
+ * edge factor then calculates all non-zero Jaccard Index similarity scores
+ * between vertices.
+ *
+ * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ */
+public class JaccardIndex {
+
+   public static final int DEFAULT_SCALE = 10;
+
+   public static final int DEFAULT_EDGE_FACTOR = 16;
+
+   public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+   public static void main(String[] args) throws Exception {
+   // Set up the execution environment
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().enableObjectReuse();
+
+   ParameterTool parameters = ParameterTool.fromArgs(args);
+
+   // Generate RMat graph
+   int scale = parameters.getInt("scale", DEFAULT_SCALE);
+   int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
+
+   RandomGenerableFactory rnd = new 
JDKRandomGeneratorFactory();
+
+   long vertexCount = 1L << scale;
+   long edgeCount = vertexCount * edgeFactor;
+
+   boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
+
+   Graph graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+   .setSimpleGraph(true, clipAndFlip)
+   .generate();
+
+   DataSet js;
+
+   if (scale > 32) {
+   js = graph
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   } else {
+   js = graph
+   .run(new TranslateGraphIds(new LongValueToIntValue()))
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   }
+
+   switch (parameters.get("output", "")) {
--- End diff --

Is the usage statement sufficient?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: 

[GitHub] flink pull request: [script] Simple fix of typo from JobManager to...

2016-05-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2001


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3935) Invalid check of key and ordering fields in PartitionNode

2016-05-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3935.

Resolution: Fixed

Fixed with 853a5359eb76a987f3fee33065b34051c2fa6094

> Invalid check of key and ordering fields in PartitionNode
> -
>
> Key: FLINK-3935
> URL: https://issues.apache.org/jira/browse/FLINK-3935
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.1.0
>
>
> {{PartitionNode}} checks for range partitioning that partition keys and the 
> fields of the ordering are identical. However the check is not correctly 
> implemented because {{PartitionNode}} holds the partition keys as an 
> unordered {{FieldSet}} which is compared against an ordered {{FieldList}} 
> provided by the {{Ordering}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3934) Prevent translation of non-equi joins in DataSetJoinRule

2016-05-19 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-3934.

   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed with 3d6ce294123d90ca7bb029d4041f29a1ae1ccd81

> Prevent translation of non-equi joins in DataSetJoinRule
> 
>
> Key: FLINK-3934
> URL: https://issues.apache.org/jira/browse/FLINK-3934
> Project: Flink
>  Issue Type: Bug
>  Components: Table API
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
> Fix For: 1.1.0
>
>
> At the moment, also non-equi joins are translated into {{DataSetJoin}}s. 
> To prevent such plans from being picked, we assign huge costs and eventually 
> fail their translation into DataSet programs.
> A better solution is to prevent a non-equi join from being translated into a 
> {{DataSetJoin}} in the {{DataSetJoinRule}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291594#comment-15291594
 ] 

ASF GitHub Bot commented on FLINK-3836:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1966#issuecomment-220395032
  
I think for now, we should add a `@Deprecated` annotation to the 
`Histogram` and its related method on `RuntimeContext` and mention in the 
comment that we encourage the `LongHistogram` instead.

On a major release, we should go through all deprecated parts and remove 
them.


> Change Histogram to enable Long counters
> 
>
> Key: FLINK-3836
> URL: https://issues.apache.org/jira/browse/FLINK-3836
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.2
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
>
> Change 
> flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java
>  to enable Long counts instead of Integer. In particular, change the TreeMap 
> to be .



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3836] Add LongHistogram accumulator

2016-05-19 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1966#issuecomment-220395032
  
I think for now, we should add a `@Deprecated` annotation to the 
`Histogram` and its related method on `RuntimeContext` and mention in the 
comment that we encourage the `LongHistogram` instead.

On a major release, we should go through all deprecated parts and remove 
them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3909) Maven Failsafe plugin may report SUCCESS on failed tests

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291517#comment-15291517
 ] 

ASF GitHub Bot commented on FLINK-3909:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/2003#issuecomment-220388275
  
I get segfaults from RocksDB in my Travis tests:  
https://travis-ci.org/mxm/flink/builds/131412162

Let's see how the builds here go. All in all, looks ready to be merged.


> Maven Failsafe plugin may report SUCCESS on failed tests
> 
>
> Key: FLINK-3909
> URL: https://issues.apache.org/jira/browse/FLINK-3909
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
> The following build completed successfully on Travis but there are actually 
> test failures: https://travis-ci.org/apache/flink/jobs/129943398#L5402



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3909] update Maven Failsafe version

2016-05-19 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/2003#issuecomment-220388275
  
I get segfaults from RocksDB in my Travis tests:  
https://travis-ci.org/mxm/flink/builds/131412162

Let's see how the builds here go. All in all, looks ready to be merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3927) TaskManager registration may fail if Yarn versions don't match

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291484#comment-15291484
 ] 

ASF GitHub Bot commented on FLINK-3927:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2013

[FLINK-3927][yarn] make container id consistent across Hadoop versions

Fixes a bug where the container id generation would vary across Hadoop 
versions of the client/cluster. The ResourceManager assumes a persistent 
resource id.

Based on #2012, to re-enable the Yarn tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3927

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2013.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2013


commit 422e078c93b558dba3d0c6a53643824198e2c545
Author: Maximilian Michels 
Date:   2016-05-19T12:29:12Z

[FLINK-3927][yarn] make container id consistent across Hadoop versions

- introduce a unique container id independent of the Hadoop version
- improve printing of exceptions during registration
- minor improvements to the Yarn ResourceManager code

commit c27fc8553f4dc0fbcee09c52848477cff2de0b11
Author: Maximilian Michels 
Date:   2016-05-19T15:59:23Z

[FLINK-3938] re-enable Yarn tests

As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not
executed anymore. They were moved to the test directory but there was
still a Maven configuration in place to change the test directory.




> TaskManager registration may fail if Yarn versions don't match
> --
>
> Key: FLINK-3927
> URL: https://issues.apache.org/jira/browse/FLINK-3927
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
> Flink's ResourceManager uses the Yarn container ids to identify connecting 
> task managers. Yarn's stringified container id may not be consistent across 
> different Hadoop versions, e.g. Hadoop 2.3.0 and Hadoop 2.7.1. The 
> ResourceManager gets it from the Yarn reports while the TaskManager infers it 
> from the Yarn environment variables. The ResourceManager may use Hadoop 2.3.0 
> version while the cluster runs Hadoop 2.7.1. 
> The solution is to pass the ID through a custom environment variable which is 
> set by the ResourceManager before launching the TaskManager in the container. 
> That way we will always use the Hadoop client's id generation method.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3927][yarn] make container id consisten...

2016-05-19 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2013

[FLINK-3927][yarn] make container id consistent across Hadoop versions

Fixes a bug where the container id generation would vary across Hadoop 
versions of the client/cluster. The ResourceManager assumes a persistent 
resource id.

Based on #2012, to re-enable the Yarn tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3927

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2013.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2013


commit 422e078c93b558dba3d0c6a53643824198e2c545
Author: Maximilian Michels 
Date:   2016-05-19T12:29:12Z

[FLINK-3927][yarn] make container id consistent across Hadoop versions

- introduce a unique container id independent of the Hadoop version
- improve printing of exceptions during registration
- minor improvements to the Yarn ResourceManager code

commit c27fc8553f4dc0fbcee09c52848477cff2de0b11
Author: Maximilian Michels 
Date:   2016-05-19T15:59:23Z

[FLINK-3938] re-enable Yarn tests

As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not
executed anymore. They were moved to the test directory but there was
still a Maven configuration in place to change the test directory.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291431#comment-15291431
 ] 

ASF GitHub Bot commented on FLINK-3190:
---

Github user fijolekProjects closed the pull request at:

https://github.com/apache/flink/pull/1954


> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291432#comment-15291432
 ] 

ASF GitHub Bot commented on FLINK-3190:
---

GitHub user fijolekProjects reopened a pull request:

https://github.com/apache/flink/pull/1954

[FLINK-3190] failure rate restart strategy

Failure rate restart strategy - job should only die, if it fails too often 
in a given time frame

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fijolekProjects/flink FLINK-3190

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1954


commit 1c78e8cbee8cc842daac3bd72891dbf7a515bf21
Author: Michal Fijolek 
Date:   2016-03-13T00:40:15Z

[FLINK-3190] failure rate restart strategy




> Retry rate limits for DataStream API
> 
>
> Key: FLINK-3190
> URL: https://issues.apache.org/jira/browse/FLINK-3190
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Assignee: Michał Fijołek
>Priority: Minor
>
> For a long running stream processing job, absolute numbers of retries don't 
> make much sense: The job will accumulate transient errors over time and will 
> die eventually when thresholds are exceeded. Rate limits are better suited in 
> this scenario: A job should only die, if it fails too often in a given time 
> frame. To better overcome transient errors, retry delays could be used, as 
> suggested in other issues.
> Absolute numbers of retries can still make sense, if failing operators don't 
> make any progress at all. We can measure progress by OperatorState changes 
> and by observing output, as long as the operator in question is not a sink. 
> If operator state changes and/or operator produces output, we can assume it 
> makes progress.
> As an example, let's say we configured a retry rate limit of 10 retries per 
> hour and a non-sink operator A. If the operator fails once every 10 minutes 
> and produces output between failures, it should not lead to job termination. 
> But if the operator fails 11 times in an hour or does not produce output 
> between 11 consecutive failures, job should be terminated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy

2016-05-19 Thread fijolekProjects
GitHub user fijolekProjects reopened a pull request:

https://github.com/apache/flink/pull/1954

[FLINK-3190] failure rate restart strategy

Failure rate restart strategy - job should only die, if it fails too often 
in a given time frame

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fijolekProjects/flink FLINK-3190

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1954.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1954


commit 1c78e8cbee8cc842daac3bd72891dbf7a515bf21
Author: Michal Fijolek 
Date:   2016-03-13T00:40:15Z

[FLINK-3190] failure rate restart strategy




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy

2016-05-19 Thread fijolekProjects
Github user fijolekProjects closed the pull request at:

https://github.com/apache/flink/pull/1954


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3938) Yarn tests don't run on the current master

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291368#comment-15291368
 ] 

ASF GitHub Bot commented on FLINK-3938:
---

GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2012

[FLINK-3938] re-enable Yarn tests

Fixes a regression of #1915.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2012


commit a2708f1c2e2f59a92f39d32a5e302b7f57267678
Author: Maximilian Michels 
Date:   2016-05-19T15:59:23Z

[FLINK-3938] re-enable Yarn tests

As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not
executed anymore. They were moved to the test directory but there was
still a Maven configuration in place to change the test directory.




> Yarn tests don't run on the current master
> --
>
> Key: FLINK-3938
> URL: https://issues.apache.org/jira/browse/FLINK-3938
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
>Priority: Critical
> Fix For: 1.1.0
>
>
> Independently of FLINK-3909, I just discovered that the Yarn tests don't run 
> on the current master (09b428b).
> {noformat}
> [INFO] 
> 
> [INFO] Building flink-yarn-tests 1.1-SNAPSHOT
> [INFO] 
> 
> [INFO] 
> [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ 
> flink-yarn-tests_2.10 ---
> [INFO] 
> [INFO] --- maven-checkstyle-plugin:2.16:check (validate) @ 
> flink-yarn-tests_2.10 ---
> [INFO] 
> [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-maven) @ 
> flink-yarn-tests_2.10 ---
> [INFO] 
> [INFO] --- build-helper-maven-plugin:1.7:add-source (add-source) @ 
> flink-yarn-tests_2.10 ---
> [INFO] Source directory: 
> /home/travis/build/apache/flink/flink-yarn-tests/src/main/scala added.
> [INFO] 
> [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ 
> flink-yarn-tests_2.10 ---
> [INFO] 
> [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ 
> flink-yarn-tests_2.10 ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] skip non existing resourceDirectory 
> /home/travis/build/apache/flink/flink-yarn-tests/src/main/resources
> [INFO] Copying 3 resources
> [INFO] 
> [INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ 
> flink-yarn-tests_2.10 ---
> [INFO] No sources to compile
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
> flink-yarn-tests_2.10 ---
> [INFO] No sources to compile
> [INFO] 
> [INFO] --- build-helper-maven-plugin:1.7:add-test-source (add-test-source) @ 
> flink-yarn-tests_2.10 ---
> [INFO] Test Source directory: 
> /home/travis/build/apache/flink/flink-yarn-tests/src/test/scala added.
> [INFO] 
> [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ 
> flink-yarn-tests_2.10 ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] Copying 1 resource
> [INFO] Copying 3 resources
> [INFO] 
> [INFO] --- scala-maven-plugin:3.1.4:testCompile (scala-test-compile) @ 
> flink-yarn-tests_2.10 ---
> [INFO] /home/travis/build/apache/flink/flink-yarn-tests/src/test/scala:-1: 
> info: compiling
> [INFO] Compiling 2 source files to 
> /home/travis/build/apache/flink/flink-yarn-tests/target/test-classes at 
> 1463615798796
> [INFO] prepare-compile in 0 s
> [INFO] compile in 9 s
> [INFO] 
> [INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ 
> flink-yarn-tests_2.10 ---
> [INFO] Nothing to compile - all classes are up to date
> [INFO] 
> [INFO] --- maven-surefire-plugin:2.18.1:test (default-test) @ 
> flink-yarn-tests_2.10 ---
> [INFO] Surefire report directory: 
> /home/travis/build/apache/flink/flink-yarn-tests/target/surefire-reports
> [WARNING] The system property log4j.configuration is configured twice! The 
> property appears in  and any of , 
>  or user property.
> ---
>  T E S T S
> ---
> Results :
> Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3938] re-enable Yarn tests

2016-05-19 Thread mxm
GitHub user mxm opened a pull request:

https://github.com/apache/flink/pull/2012

[FLINK-3938] re-enable Yarn tests

Fixes a regression of #1915.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mxm/flink FLINK-3938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2012.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2012


commit a2708f1c2e2f59a92f39d32a5e302b7f57267678
Author: Maximilian Michels 
Date:   2016-05-19T15:59:23Z

[FLINK-3938] re-enable Yarn tests

As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not
executed anymore. They were moved to the test directory but there was
still a Maven configuration in place to change the test directory.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3932) Implement State Backend Security

2016-05-19 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-3932:

Description: 
_This issue is part of a series of improvements detailed in the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

Flink should protect its HA, checkpoint, and savepoint state against 
unauthorized access.

As described in the design doc, implement:
- ZooKeeper authentication w/ Kerberos
- ZooKeeper authorization (i.e. znode ACLs)
- Checkpoint/savepoint data protection




  was:
_This issue is part of a series of improvements detailed the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

Flink should protect its HA, checkpoint, and savepoint state against 
unauthorized access.

As described in the design doc, implement:
- ZooKeeper authentication w/ Kerberos
- ZooKeeper authorization (i.e. znode ACLs)
- Checkpoint/savepoint data protection





> Implement State Backend Security
> 
>
> Key: FLINK-3932
> URL: https://issues.apache.org/jira/browse/FLINK-3932
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>  Labels: security
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Flink should protect its HA, checkpoint, and savepoint state against 
> unauthorized access.
> As described in the design doc, implement:
> - ZooKeeper authentication w/ Kerberos
> - ZooKeeper authorization (i.e. znode ACLs)
> - Checkpoint/savepoint data protection



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3931) Implement Transport Encryption (SSL/TLS)

2016-05-19 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-3931:

Description: 
_This issue is part of a series of improvements detailed in the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

To assure privacy and data integrity between Flink components, enable TLS for 
all communication channels.  As described in the design doc:
- Accept a configured certificate or generate a certificate.
- Enable Akka SSL
- Implement Data Transfer SSL
- Implement Blob Server SSL
- Implement Web UI HTTPS


  was:
_This issue is part of a series of improvements detailed the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

To assure privacy and data integrity between Flink components, enable TLS for 
all communication channels.  As described in the design doc:
- Accept a configured certificate or generate a certificate.
- Enable Akka SSL
- Implement Data Transfer SSL
- Implement Blob Server SSL
- Implement Web UI HTTPS



> Implement Transport Encryption (SSL/TLS)
> 
>
> Key: FLINK-3931
> URL: https://issues.apache.org/jira/browse/FLINK-3931
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>  Labels: security
>   Original Estimate: 1,008h
>  Remaining Estimate: 1,008h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> To assure privacy and data integrity between Flink components, enable TLS for 
> all communication channels.  As described in the design doc:
> - Accept a configured certificate or generate a certificate.
> - Enable Akka SSL
> - Implement Data Transfer SSL
> - Implement Blob Server SSL
> - Implement Web UI HTTPS



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3930) Implement Service-Level Authorization

2016-05-19 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-3930:

Description: 
_This issue is part of a series of improvements detailed in the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

Service-level authorization is the initial authorization mechanism to ensure 
clients (or servers) connecting to the Flink cluster are authorized to do so.   
The purpose is to prevent a cluster from being used by an unauthorized user, 
whether to execute jobs, disrupt cluster functionality, or gain access to 
secrets stored within the cluster.

Implement service-level authorization as described in the design doc.
- Introduce a shared secret cookie
- Enable Akka security cookie
- Implement data transfer authentication
- Secure the web dashboard


  was:
_This issue is part of a series of improvements detailed the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

Service-level authorization is the initial authorization mechanism to ensure 
clients (or servers) connecting to the Flink cluster are authorized to do so.   
The purpose is to prevent a cluster from being used by an unauthorized user, 
whether to execute jobs, disrupt cluster functionality, or gain access to 
secrets stored within the cluster.

Implement service-level authorization as described in the design doc.
- Introduce a shared secret cookie
- Enable Akka security cookie
- Implement data transfer authentication
- Secure the web dashboard



> Implement Service-Level Authorization
> -
>
> Key: FLINK-3930
> URL: https://issues.apache.org/jira/browse/FLINK-3930
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>  Labels: security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Service-level authorization is the initial authorization mechanism to ensure 
> clients (or servers) connecting to the Flink cluster are authorized to do so. 
>   The purpose is to prevent a cluster from being used by an unauthorized 
> user, whether to execute jobs, disrupt cluster functionality, or gain access 
> to secrets stored within the cluster.
> Implement service-level authorization as described in the design doc.
> - Introduce a shared secret cookie
> - Enable Akka security cookie
> - Implement data transfer authentication
> - Secure the web dashboard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-05-19 Thread Eron Wright (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eron Wright  updated FLINK-3929:

Description: 
_This issue is part of a series of improvements detailed in the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

Add support for a keytab credential to be associated with the Flink cluster, to 
facilitate:
- Kerberos-authenticated data access for connectors
- Kerberos-authenticated ZooKeeper access

Support both the standalone and YARN deployment modes.
 

  was:
_This issue is part of a series of improvements detailed the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc._

Add support for a keytab credential to be associated with the Flink cluster, to 
facilitate:
- Kerberos-authenticated data access for connectors
- Kerberos-authenticated ZooKeeper access

Support both the standalone and YARN deployment modes.
 


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor

2016-05-19 Thread Flavio Pompermaier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291351#comment-15291351
 ] 

Flavio Pompermaier commented on FLINK-3922:
---

I think that if you can run the code I attached successfully then you can close 
it :)

However, maybe I'd add a unit test for it...

> Infinite recursion on TypeExtractor
> ---
>
> Key: FLINK-3922
> URL: https://issues.apache.org/jira/browse/FLINK-3922
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Timo Walther
>Priority: Critical
>
> This program cause a StackOverflow (infinite recursion) in the TypeExtractor:
> {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid}
> public class TypeSerializerStackOverflowOnRecursivePojo {
>   public static class RecursivePojo implements Serializable {
>   private static final long serialVersionUID = 1L;
>   
>   private RecursivePojo parent;
>   public RecursivePojo(){}
>   public RecursivePojo(K k, V v) {
>   }
>   public RecursivePojo getParent() {
>   return parent;
>   }
>   public void setParent(RecursivePojo parent) {
>   this.parent = parent;
>   }
>   
>   }
>   public static class TypedTuple extends Tuple3 RecursivePojo>>{
>   private static final long serialVersionUID = 1L;
>   }
>   
>   public static void main(String[] args) throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   env.fromCollection(Arrays.asList(new RecursivePojo Map>("test",new HashMap(
>   .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", 
> "1", t);return ret;}).returns(TypedTuple.class)
>   .print();
>   }
>   
> }
> {code}
> The thrown Exception is the following:
> {code:title=Exception thrown}
> Exception in thread "main" java.lang.StackOverflowError
>   at 
> sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43)
>   at 
> sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74)
>   at 
> sun.reflect.generics.repository.GenericDeclRepository.(GenericDeclRepository.java:49)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.(ConstructorRepository.java:51)
>   at 
> sun.reflect.generics.repository.MethodRepository.(MethodRepository.java:46)
>   at 
> sun.reflect.generics.repository.MethodRepository.make(MethodRepository.java:59)
>   at java.lang.reflect.Method.getGenericInfo(Method.java:102)
>   at java.lang.reflect.Method.getGenericReturnType(Method.java:255)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1610)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1671)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>   at 
> 

[jira] [Created] (FLINK-3938) Yarn tests don't run on the current master

2016-05-19 Thread Maximilian Michels (JIRA)
Maximilian Michels created FLINK-3938:
-

 Summary: Yarn tests don't run on the current master
 Key: FLINK-3938
 URL: https://issues.apache.org/jira/browse/FLINK-3938
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.1.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Priority: Critical
 Fix For: 1.1.0


Independently of FLINK-3909, I just discovered that the Yarn tests don't run on 
the current master (09b428b).

{noformat}
[INFO] 
[INFO] Building flink-yarn-tests 1.1-SNAPSHOT
[INFO] 
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ flink-yarn-tests_2.10 
---
[INFO] 
[INFO] --- maven-checkstyle-plugin:2.16:check (validate) @ 
flink-yarn-tests_2.10 ---
[INFO] 
[INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-maven) @ 
flink-yarn-tests_2.10 ---
[INFO] 
[INFO] --- build-helper-maven-plugin:1.7:add-source (add-source) @ 
flink-yarn-tests_2.10 ---
[INFO] Source directory: 
/home/travis/build/apache/flink/flink-yarn-tests/src/main/scala added.
[INFO] 
[INFO] --- maven-remote-resources-plugin:1.5:process (default) @ 
flink-yarn-tests_2.10 ---
[INFO] 
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ 
flink-yarn-tests_2.10 ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory 
/home/travis/build/apache/flink/flink-yarn-tests/src/main/resources
[INFO] Copying 3 resources
[INFO] 
[INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ 
flink-yarn-tests_2.10 ---
[INFO] No sources to compile
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
flink-yarn-tests_2.10 ---
[INFO] No sources to compile
[INFO] 
[INFO] --- build-helper-maven-plugin:1.7:add-test-source (add-test-source) @ 
flink-yarn-tests_2.10 ---
[INFO] Test Source directory: 
/home/travis/build/apache/flink/flink-yarn-tests/src/test/scala added.
[INFO] 
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ 
flink-yarn-tests_2.10 ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO] 
[INFO] --- scala-maven-plugin:3.1.4:testCompile (scala-test-compile) @ 
flink-yarn-tests_2.10 ---
[INFO] /home/travis/build/apache/flink/flink-yarn-tests/src/test/scala:-1: 
info: compiling
[INFO] Compiling 2 source files to 
/home/travis/build/apache/flink/flink-yarn-tests/target/test-classes at 
1463615798796
[INFO] prepare-compile in 0 s
[INFO] compile in 9 s
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ 
flink-yarn-tests_2.10 ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- maven-surefire-plugin:2.18.1:test (default-test) @ 
flink-yarn-tests_2.10 ---
[INFO] Surefire report directory: 
/home/travis/build/apache/flink/flink-yarn-tests/target/surefire-reports
[WARNING] The system property log4j.configuration is configured twice! The 
property appears in  and any of , 
 or user property.

---
 T E S T S
---

Results :

Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291312#comment-15291312
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63901325
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {
 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63901325
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {
+   // the intersection includes every vertex
+   assertEquals(completeGraphVertexCount, 
result.getDistinctNeighborCount().getValue());
+
+   // the union only excludes the 

[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291289#comment-15291289
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63899403
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63899403
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {
+   // the intersection includes every vertex
+   assertEquals(completeGraphVertexCount, 
result.getDistinctNeighborCount().getValue());
+
+   // the union only excludes the two 

[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291281#comment-15291281
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63898835
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {
 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63898835
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {
+   // the intersection includes every vertex
+   assertEquals(completeGraphVertexCount, 
result.getDistinctNeighborCount().getValue());
+
+   // the union only excludes the 

[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291259#comment-15291259
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63896459
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {

[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor

2016-05-19 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291260#comment-15291260
 ] 

Timo Walther commented on FLINK-3922:
-

[~f.pompermaier] I have opened a PR. Can you confirm that it solves your 
problem?

> Infinite recursion on TypeExtractor
> ---
>
> Key: FLINK-3922
> URL: https://issues.apache.org/jira/browse/FLINK-3922
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Timo Walther
>Priority: Critical
>
> This program cause a StackOverflow (infinite recursion) in the TypeExtractor:
> {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid}
> public class TypeSerializerStackOverflowOnRecursivePojo {
>   public static class RecursivePojo implements Serializable {
>   private static final long serialVersionUID = 1L;
>   
>   private RecursivePojo parent;
>   public RecursivePojo(){}
>   public RecursivePojo(K k, V v) {
>   }
>   public RecursivePojo getParent() {
>   return parent;
>   }
>   public void setParent(RecursivePojo parent) {
>   this.parent = parent;
>   }
>   
>   }
>   public static class TypedTuple extends Tuple3 RecursivePojo>>{
>   private static final long serialVersionUID = 1L;
>   }
>   
>   public static void main(String[] args) throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   env.fromCollection(Arrays.asList(new RecursivePojo Map>("test",new HashMap(
>   .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", 
> "1", t);return ret;}).returns(TypedTuple.class)
>   .print();
>   }
>   
> }
> {code}
> The thrown Exception is the following:
> {code:title=Exception thrown}
> Exception in thread "main" java.lang.StackOverflowError
>   at 
> sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43)
>   at 
> sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74)
>   at 
> sun.reflect.generics.repository.GenericDeclRepository.(GenericDeclRepository.java:49)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.(ConstructorRepository.java:51)
>   at 
> sun.reflect.generics.repository.MethodRepository.(MethodRepository.java:46)
>   at 
> sun.reflect.generics.repository.MethodRepository.make(MethodRepository.java:59)
>   at java.lang.reflect.Method.getGenericInfo(Method.java:102)
>   at java.lang.reflect.Method.getGenericReturnType(Method.java:255)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1610)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1671)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732)
>   at 
> org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678)
>   at 
> 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63896459
  
--- Diff: 
flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
 ---
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class JaccardIndexTest
+extends AsmTestBase {
+
+   @Test
+   public void testSimpleGraph()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex());
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMinimumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMinimumScore(1, 2));
+
+   String expectedResult =
+   "(0,3,(2,4))\n" +
+   "(1,2,(2,4))\n" +
+   "(4,5,(1,1))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testSimpleGraphWithMaximumScore()
+   throws Exception {
+   DataSet ji = undirectedSimpleGraph
+   .run(new JaccardIndex()
+   .setMaximumScore(1, 2));
+
+   String expectedResult =
+   "(0,1,(1,4))\n" +
+   "(0,2,(1,4))\n" +
+   "(1,3,(1,6))\n" +
+   "(1,4,(1,3))\n" +
+   "(1,5,(1,3))\n" +
+   "(2,3,(1,6))\n" +
+   "(2,4,(1,3))\n" +
+   "(2,5,(1,3))\n";
+
+   TestBaseUtils.compareResultAsText(ji.collect(), expectedResult);
+   }
+
+   @Test
+   public void testCompleteGraph()
+   throws Exception {
+   DataSet ji = completeGraph
+   .run(new JaccardIndex()
+   .setGroupSize(4));
+
+   for (Result result : ji.collect()) {
+   // the intersection includes every vertex
+   assertEquals(completeGraphVertexCount, 
result.getDistinctNeighborCount().getValue());
+
+   // the union only excludes the two 

[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291251#comment-15291251
 ] 

ASF GitHub Bot commented on FLINK-3922:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2011

[FLINK-3922] [types] Infinite recursion on TypeExtractor

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

Fixes a special bug if a parameterized Pojo contains a recursive field of 
same type.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-3922

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2011.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2011


commit 91b988a240f7a71528ea16c3ffb5d605e0d920de
Author: twalthr 
Date:   2016-05-19T15:01:57Z

[FLINK-3922] [types] Infinite recursion on TypeExtractor




> Infinite recursion on TypeExtractor
> ---
>
> Key: FLINK-3922
> URL: https://issues.apache.org/jira/browse/FLINK-3922
> Project: Flink
>  Issue Type: Bug
>  Components: Type Serialization System
>Affects Versions: 1.0.2
>Reporter: Flavio Pompermaier
>Assignee: Timo Walther
>Priority: Critical
>
> This program cause a StackOverflow (infinite recursion) in the TypeExtractor:
> {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid}
> public class TypeSerializerStackOverflowOnRecursivePojo {
>   public static class RecursivePojo implements Serializable {
>   private static final long serialVersionUID = 1L;
>   
>   private RecursivePojo parent;
>   public RecursivePojo(){}
>   public RecursivePojo(K k, V v) {
>   }
>   public RecursivePojo getParent() {
>   return parent;
>   }
>   public void setParent(RecursivePojo parent) {
>   this.parent = parent;
>   }
>   
>   }
>   public static class TypedTuple extends Tuple3 RecursivePojo>>{
>   private static final long serialVersionUID = 1L;
>   }
>   
>   public static void main(String[] args) throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   env.fromCollection(Arrays.asList(new RecursivePojo Map>("test",new HashMap(
>   .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", 
> "1", t);return ret;}).returns(TypedTuple.class)
>   .print();
>   }
>   
> }
> {code}
> The thrown Exception is the following:
> {code:title=Exception thrown}
> Exception in thread "main" java.lang.StackOverflowError
>   at 
> sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577)
>   at 
> sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55)
>   at 
> sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43)
>   at 
> sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74)
>   at 
> 

[GitHub] flink pull request: [FLINK-3922] [types] Infinite recursion on Typ...

2016-05-19 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2011

[FLINK-3922] [types] Infinite recursion on TypeExtractor

- [x] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [x] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [x] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed

Fixes a special bug if a parameterized Pojo contains a recursive field of 
same type.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-3922

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2011.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2011


commit 91b988a240f7a71528ea16c3ffb5d605e0d920de
Author: twalthr 
Date:   2016-05-19T15:01:57Z

[FLINK-3922] [types] Infinite recursion on TypeExtractor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291243#comment-15291243
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63895157
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
+   Preconditions.checkArgument(numerator >= 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63895157
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
+   Preconditions.checkArgument(numerator >= 0, "Minimum score 
numerator must be non-negative");
+   Preconditions.checkArgument(denominator > 0, "Minimum score 
denominator must be greater than zero");
+   Preconditions.checkArgument(numerator <= 

[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291232#comment-15291232
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894471
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
+   Preconditions.checkArgument(numerator >= 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894471
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
+   Preconditions.checkArgument(numerator >= 0, "Minimum score 
numerator must be non-negative");
+   Preconditions.checkArgument(denominator > 0, "Minimum score 
denominator must be greater than zero");
+   Preconditions.checkArgument(numerator <= 

[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291228#comment-15291228
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894348
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
+   Preconditions.checkArgument(numerator >= 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894348
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
+   Preconditions.checkArgument(numerator >= 0, "Minimum score 
numerator must be non-negative");
+   Preconditions.checkArgument(denominator > 0, "Minimum score 
denominator must be greater than zero");
+   Preconditions.checkArgument(numerator <= 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894239
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
--- End diff --

This should be documented in the library usage docs too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled 

[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291227#comment-15291227
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894239
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
+   Preconditions.checkArgument(groupSize > 0, "Group size must be 
greater than zero");
+
+   this.groupSize = groupSize;
+
+   return this;
+   }
+
+   /**
+* Filter out Jaccard Index scores less than the given minimum fraction.
+*
+* @param numerator numerator of the minimum score
+* @param denominator denominator of the minimum score
+* @return this
+* @see #setMaximumScore(int, int)
+*/
+   public JaccardIndex setMinimumScore(int numerator, int 
denominator) {
--- End diff --

This should be documented in 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894069
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
--- End diff --

Could you maybe extend the description of what groupSize does? And it would 
be nice to add documentation for this method in the library docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291225#comment-15291225
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63894069
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
 ---
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.similarity;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import 
org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Jaccard Index measures the similarity between vertex neighborhoods.
+ * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are 
common).
+ * 
+ * This implementation produces similarity scores for each pair of vertices
+ * in the graph with at least one common neighbor; equivalently, this is 
the
+ * set of all non-zero Jaccard Similarity coefficients.
+ * 
+ * The input graph must be a simple, undirected graph containing no 
duplicate
+ * edges or self-loops.
+ *
+ * @param  graph ID type
+ * @param  vertex value type
+ * @param  edge value type
+ */
+public class JaccardIndex, VV, EV>
+implements GraphAlgorithm> {
+
+   public static final int DEFAULT_GROUP_SIZE = 64;
+
+   // Optional configuration
+   private int groupSize = DEFAULT_GROUP_SIZE;
+
+   private boolean unboundedScores = true;
+
+   private int minimumScoreNumerator = 0;
+
+   private int minimumScoreDenominator = 1;
+
+   private int maximumScoreNumerator = 1;
+
+   private int maximumScoreDenominator = 0;
+
+   private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+   /**
+* Override the default group size for the quadratic expansion of 
neighbor
+* pairs. Small groups generate more data whereas large groups 
distribute
+* computation less evenly among tasks.
+*
+* @param groupSize the group size for the quadratic expansion of 
neighbor pairs
+* @return this
+*/
+   public JaccardIndex setGroupSize(int groupSize) {
--- End diff --

Could you maybe extend the description of what groupSize does? And it would 
be nice to add documentation for this method in the library docs.


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is 

[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291205#comment-15291205
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63892035
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
--- End diff --

I've also seen triad to mean any three vertices and triplet to mean any 
three connected vertices. I'll rework this section.


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63893564
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
+neighbors is emitted with the endpoint degree sum. Grouping on two-paths, 
the common neighbors are counted.
+
+ Usage
+The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing two vertex IDs,
+the number of common neighbors, and the number of distinct neighbors. The 
graph ID type must be `Comparable` and
--- End diff --

Ah great! Can you add this here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291219#comment-15291219
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63893564
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
+neighbors is emitted with the endpoint degree sum. Grouping on two-paths, 
the common neighbors are counted.
+
+ Usage
+The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing two vertex IDs,
+the number of common neighbors, and the number of distinct neighbors. The 
graph ID type must be `Comparable` and
--- End diff --

Ah great! Can you add this here?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63893365
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
+neighbors is emitted with the endpoint degree sum. Grouping on two-paths, 
the common neighbors are counted.
+
+ Usage
+The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing two vertex IDs,
+the number of common neighbors, and the number of distinct neighbors. The 
graph ID type must be `Comparable` and
--- End diff --

It does, from `Result.getJaccardIndexScore()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291217#comment-15291217
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63893365
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
+neighbors is emitted with the endpoint degree sum. Grouping on two-paths, 
the common neighbors are counted.
+
+ Usage
+The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing two vertex IDs,
+the number of common neighbors, and the number of distinct neighbors. The 
graph ID type must be `Comparable` and
--- End diff --

It does, from `Result.getJaccardIndexScore()`.


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291216#comment-15291216
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63893177
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate the edge degree returned by the degree annotation functions 
from
+ * {@link LongValue} to {@link IntValue}.
+ *
+ * @param  edge value type
+ */
+public class TranslateEdgeDegreeToIntValue
--- End diff --

This is a general translator from `LongValue` to `IntValue`, not just for 
edge degrees, right? Maybe we could rename it to demonstrate that?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63893177
  
--- Diff: 
flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+
+/**
+ * Translate the edge degree returned by the degree annotation functions 
from
+ * {@link LongValue} to {@link IntValue}.
+ *
+ * @param  edge value type
+ */
+public class TranslateEdgeDegreeToIntValue
--- End diff --

This is a general translator from `LongValue` to `IntValue`, not just for 
edge degrees, right? Maybe we could rename it to demonstrate that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-05-19 Thread Sebastian Klemke (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sebastian Klemke updated FLINK-3937:

Attachment: improve_flink_cli_yarn_integration.patch

Patch that implements -yid option. Applies cleanly to git rev c71675f

> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> --
>
> Key: FLINK-3937
> URL: https://issues.apache.org/jira/browse/FLINK-3937
> Project: Flink
>  Issue Type: Improvement
>Reporter: Sebastian Klemke
>Priority: Trivial
> Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for 
> Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop 
> subcommands are hard to invoke if you only know the YARN application ID. As 
> an improvement, I suggest adding a -yid  option to the 
> mentioned subcommands that can be used together with -m yarn-cluster. Flink 
> cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291213#comment-15291213
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63892449
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example generates an undirected RMat graph with the given scale and
+ * edge factor then calculates all non-zero Jaccard Index similarity scores
+ * between vertices.
+ *
+ * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ */
+public class JaccardIndex {
+
+   public static final int DEFAULT_SCALE = 10;
+
+   public static final int DEFAULT_EDGE_FACTOR = 16;
+
+   public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+   public static void main(String[] args) throws Exception {
+   // Set up the execution environment
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().enableObjectReuse();
+
+   ParameterTool parameters = ParameterTool.fromArgs(args);
+
+   // Generate RMat graph
+   int scale = parameters.getInt("scale", DEFAULT_SCALE);
+   int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
+
+   RandomGenerableFactory rnd = new 
JDKRandomGeneratorFactory();
+
+   long vertexCount = 1L << scale;
+   long edgeCount = vertexCount * edgeFactor;
+
+   boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
+
+   Graph graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+   .setSimpleGraph(true, clipAndFlip)
+   .generate();
+
+   DataSet js;
+
+   if (scale > 32) {
+   js = graph
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   } else {
+   js = graph
+   .run(new TranslateGraphIds(new LongValueToIntValue()))
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   }
+
+   switch (parameters.get("output", "")) {
--- End diff --

Can you add a description for these output parameter options in the class 
Javadoc?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: 

[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63892449
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example generates an undirected RMat graph with the given scale and
+ * edge factor then calculates all non-zero Jaccard Index similarity scores
+ * between vertices.
+ *
+ * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ */
+public class JaccardIndex {
+
+   public static final int DEFAULT_SCALE = 10;
+
+   public static final int DEFAULT_EDGE_FACTOR = 16;
+
+   public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+   public static void main(String[] args) throws Exception {
+   // Set up the execution environment
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+   env.getConfig().enableObjectReuse();
+
+   ParameterTool parameters = ParameterTool.fromArgs(args);
+
+   // Generate RMat graph
+   int scale = parameters.getInt("scale", DEFAULT_SCALE);
+   int edgeFactor = parameters.getInt("edge_factor", 
DEFAULT_EDGE_FACTOR);
+
+   RandomGenerableFactory rnd = new 
JDKRandomGeneratorFactory();
+
+   long vertexCount = 1L << scale;
+   long edgeCount = vertexCount * edgeFactor;
+
+   boolean clipAndFlip = parameters.getBoolean("clip_and_flip", 
DEFAULT_CLIP_AND_FLIP);
+
+   Graph graph = new 
RMatGraph<>(env, rnd, vertexCount, edgeCount)
+   .setSimpleGraph(true, clipAndFlip)
+   .generate();
+
+   DataSet js;
+
+   if (scale > 32) {
+   js = graph
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   } else {
+   js = graph
+   .run(new TranslateGraphIds(new LongValueToIntValue()))
+   .run(new 
org.apache.flink.graph.library.similarity.JaccardIndex());
+   }
+
+   switch (parameters.get("output", "")) {
--- End diff --

Can you add a description for these output parameter options in the class 
Javadoc?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters

2016-05-19 Thread Sebastian Klemke (JIRA)
Sebastian Klemke created FLINK-3937:
---

 Summary: Make flink cli list, savepoint, cancel and stop work on 
Flink-on-YARN clusters
 Key: FLINK-3937
 URL: https://issues.apache.org/jira/browse/FLINK-3937
 Project: Flink
  Issue Type: Improvement
Reporter: Sebastian Klemke
Priority: Trivial


Currently, flink cli can't figure out JobManager RPC location for Flink-on-YARN 
clusters. Therefore, list, savepoint, cancel and stop subcommands are hard to 
invoke if you only know the YARN application ID. As an improvement, I suggest 
adding a -yid  option to the mentioned subcommands that can 
be used together with -m yarn-cluster. Flink cli would then retrieve JobManager 
RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291206#comment-15291206
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63892061
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example generates an undirected RMat graph with the given scale and
--- End diff --

The rest of the examples in Gelly (and all other Flink components) are 
designed so that they can be run both with user-specified inputs and without 
(default data). Could you please modify this example to work in a similar way?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63892035
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
--- End diff --

I've also seen triad to mean any three vertices and triplet to mean any 
three connected vertices. I'll rework this section.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63892061
  
--- Diff: 
flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example generates an undirected RMat graph with the given scale and
--- End diff --

The rest of the examples in Gelly (and all other Flink components) are 
designed so that they can be run both with user-specified inputs and without 
(default data). Could you please modify this example to work in a similar way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291192#comment-15291192
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63890500
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
--- End diff --

By "two-paths" you mean triads? i.e. open triangles?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291197#comment-15291197
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63891097
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
+neighbors is emitted with the endpoint degree sum. Grouping on two-paths, 
the common neighbors are counted.
+
+ Usage
+The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing two vertex IDs,
+the number of common neighbors, and the number of distinct neighbors. The 
graph ID type must be `Comparable` and
--- End diff --

Why doesn't the algorithm return the jaccard index?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63891097
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
+neighbors is emitted with the endpoint degree sum. Grouping on two-paths, 
the common neighbors are counted.
+
+ Usage
+The algorithm takes a simple, undirected graph as input and outputs a 
`DataSet` of tuples containing two vertex IDs,
+the number of common neighbors, and the number of distinct neighbors. The 
graph ID type must be `Comparable` and
--- End diff --

Why doesn't the algorithm return the jaccard index?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291195#comment-15291195
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63890887
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
--- End diff --

Source, target or both endpoints? What is a "midpoint" vertex? Is this 
standard terminology? It might help to give a small example here.


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63890887
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
+by storing the sum of degrees of the vertex pair and subtracting the count 
of common neighbors, which are double-counted
+in the sum of degrees.
+
+The algorithm first annotates each edge with the endpoint degree. Grouping 
on the midpoint vertex, each pair of
--- End diff --

Source, target or both endpoints? What is a "midpoint" vertex? Is this 
standard terminology? It might help to give a small example here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3780) Jaccard Similarity

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291193#comment-15291193
 ] 

ASF GitHub Bot commented on FLINK-3780:
---

Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63890630
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
--- End diff --

distinct _common_ neighbors?


> Jaccard Similarity
> --
>
> Key: FLINK-3780
> URL: https://issues.apache.org/jira/browse/FLINK-3780
> Project: Flink
>  Issue Type: New Feature
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
> Fix For: 1.1.0
>
>
> Implement a Jaccard Similarity algorithm computing all non-zero similarity 
> scores. This algorithm is similar to {{TriangleListing}} but instead of 
> joining two-paths against an edge list we count two-paths.
> {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which 
> relies on {{Graph.getTriplets()}} so only computes similarity scores for 
> neighbors but not neighbors-of-neighbors.
> This algorithm is easily modified for other similarity scores such as 
> Adamic-Adar similarity where the sum of endpoint degrees is replaced by the 
> degree of the middle vertex.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63890630
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
+connecting the two vertices to the common neighbor. The number of distinct 
neighbors for pairs of vertices is computed
--- End diff --

distinct _common_ neighbors?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity

2016-05-19 Thread vasia
Github user vasia commented on a diff in the pull request:

https://github.com/apache/flink/pull/1980#discussion_r63890500
  
--- Diff: docs/apis/batch/libs/gelly.md ---
@@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly 
edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of 
edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the 
number of represented elements.
 
+### Jaccard Index
+
+ Overview
+The Jaccard Index measures the similarity between vertex neighborhoods. 
Scores range from 0.0 (no common neighbors) to
+1.0 (all neighbors are common).
+
+ Details
+Counting common neighbors for pairs of vertices is equivalent to counting 
the two-paths consisting of two edges
--- End diff --

By "two-paths" you mean triads? i.e. open triangles?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Assigned] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-05-19 Thread Vijay Srinivasaraghavan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vijay Srinivasaraghavan reassigned FLINK-3929:
--

Assignee: Vijay Srinivasaraghavan  (was: Eron Wright )

> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291134#comment-15291134
 ] 

ASF GitHub Bot commented on FLINK-1745:
---

Github user danielblazevski commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r63882747
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, 
DistanceMetric,
+EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.collection.immutable.Vector
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * Calculates the `k`-nearest neighbor points in the training set for 
each point in the test set.
+  *
+  * @example
+  * {{{
+  * val trainingDS: DataSet[Vector] = ...
+  * val testingDS: DataSet[Vector] = ...
+  *
+  * val knn = KNN()
+  *   .setK(10)
+  *   .setBlocks(5)
+  *   .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  * knn.fit(trainingDS)
+  *
+  * val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.nn.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''5''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]]
+  * Sets the distance metric we use to calculate the distance between two 
points. If no metric is
+  * specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+  * (Default value: '''EuclideanDistanceMetric()''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]]
+  * A boolean variable that whether or not to use a Quadtree to partition 
the training set
+  * to potentially simplify the KNN search.  If no value is specified, the 
code will
+  * automatically decide whether or not to use a Quadtree.  Use of a 
Quadtree scales well
+  * with the number of training and testing points, though poorly with the 
dimension.
+  * (Default value:  ```None```)
+  *
+  * - [[org.apache.flink.ml.nn.KNN.SizeHint]]
+  * Specifies whether the training set or test set is small to optimize 
the cross
+  * product operation needed for the KNN search.  If the training set is 
small
+  * this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL`
+  * if the test set is small.
+  * (Default value:  ```None```)
+  *
+  */
+
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[FlinkVector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k > 0, "K 

[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...

2016-05-19 Thread danielblazevski
Github user danielblazevski commented on a diff in the pull request:

https://github.com/apache/flink/pull/1220#discussion_r63882747
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.nn
+
+import org.apache.flink.api.common.operators.Order
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.utils._
+import org.apache.flink.api.scala._
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
+import 
org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, 
DistanceMetric,
+EuclideanDistanceMetric}
+import org.apache.flink.ml.pipeline.{FitOperation, 
PredictDataSetOperation, Predictor}
+import org.apache.flink.util.Collector
+import 
org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint
+
+import scala.collection.immutable.Vector
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
+/** Implements a k-nearest neighbor join.
+  *
+  * Calculates the `k`-nearest neighbor points in the training set for 
each point in the test set.
+  *
+  * @example
+  * {{{
+  * val trainingDS: DataSet[Vector] = ...
+  * val testingDS: DataSet[Vector] = ...
+  *
+  * val knn = KNN()
+  *   .setK(10)
+  *   .setBlocks(5)
+  *   .setDistanceMetric(EuclideanDistanceMetric())
+  *
+  * knn.fit(trainingDS)
+  *
+  * val predictionDS: DataSet[(Vector, Array[Vector])] = 
knn.predict(testingDS)
+  * }}}
+  *
+  * =Parameters=
+  *
+  * - [[org.apache.flink.ml.nn.KNN.K]]
+  * Sets the K which is the number of selected points as neighbors. 
(Default value: '''5''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]]
+  * Sets the distance metric we use to calculate the distance between two 
points. If no metric is
+  * specified, then 
[[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used.
+  * (Default value: '''EuclideanDistanceMetric()''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.Blocks]]
+  * Sets the number of blocks into which the input data will be split. 
This number should be set
+  * at least to the degree of parallelism. If no value is specified, then 
the parallelism of the
+  * input [[DataSet]] is used as the number of blocks. (Default value: 
'''None''')
+  *
+  * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]]
+  * A boolean variable that whether or not to use a Quadtree to partition 
the training set
+  * to potentially simplify the KNN search.  If no value is specified, the 
code will
+  * automatically decide whether or not to use a Quadtree.  Use of a 
Quadtree scales well
+  * with the number of training and testing points, though poorly with the 
dimension.
+  * (Default value:  ```None```)
+  *
+  * - [[org.apache.flink.ml.nn.KNN.SizeHint]]
+  * Specifies whether the training set or test set is small to optimize 
the cross
+  * product operation needed for the KNN search.  If the training set is 
small
+  * this should be `CrossHint.FIRST_IS_SMALL` and set to 
`CrossHint.SECOND_IS_SMALL`
+  * if the test set is small.
+  * (Default value:  ```None```)
+  *
+  */
+
+class KNN extends Predictor[KNN] {
+
+  import KNN._
+
+  var trainingSet: Option[DataSet[Block[FlinkVector]]] = None
+
+  /** Sets K
+* @param k the number of selected points as neighbors
+*/
+  def setK(k: Int): KNN = {
+require(k > 0, "K must be positive.")
+parameters.add(K, k)
+this
+  }
+
+  /** Sets the distance metric
+* @param metric the distance metric to calculate distance between two 
points
+*/
+  def 

[GitHub] flink pull request: [FLINK-3909] update Maven Failsafe version

2016-05-19 Thread mxm
Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/2003#issuecomment-220321817
  
Thanks for looking into the problem and providing a detailed explanation 
@markreddy. I think we're good to go with the newest version of the Surefire 
plugin. I verified the tests 1) fail when errors occur 2) are actually run 
(added some debug output) 3) the number of tests/ITCases remains unchanged.

Just running one more time because I forgot to set the default of 
`reuseForks` to false for the integration tests. Reusing of forks doesn't work 
for some tests because of garbage collection problems (this was the same for 
the Failsafe plugin). Apparently almost all tests pass. So looking forward to 
merging this soon.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3909) Maven Failsafe plugin may report SUCCESS on failed tests

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291081#comment-15291081
 ] 

ASF GitHub Bot commented on FLINK-3909:
---

Github user mxm commented on the pull request:

https://github.com/apache/flink/pull/2003#issuecomment-220321817
  
Thanks for looking into the problem and providing a detailed explanation 
@markreddy. I think we're good to go with the newest version of the Surefire 
plugin. I verified the tests 1) fail when errors occur 2) are actually run 
(added some debug output) 3) the number of tests/ITCases remains unchanged.

Just running one more time because I forgot to set the default of 
`reuseForks` to false for the integration tests. Reusing of forks doesn't work 
for some tests because of garbage collection problems (this was the same for 
the Failsafe plugin). Apparently almost all tests pass. So looking forward to 
merging this soon.


> Maven Failsafe plugin may report SUCCESS on failed tests
> 
>
> Key: FLINK-3909
> URL: https://issues.apache.org/jira/browse/FLINK-3909
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
> The following build completed successfully on Travis but there are actually 
> test failures: https://travis-ci.org/apache/flink/jobs/129943398#L5402



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types

2016-05-19 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3936:


 Summary: Add MIN / MAX aggregations function for BOOLEAN types
 Key: FLINK-3936
 URL: https://issues.apache.org/jira/browse/FLINK-3936
 Project: Flink
  Issue Type: Improvement
  Components: Table API
Affects Versions: 1.1.0
Reporter: Fabian Hueske
 Fix For: 1.1.0


When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on 
Boolean literals to translate a decorrelate subquery in an {{EXIST}} predicate.

MIN and MAX aggregates on Boolean data types are currently not supported and 
should be added.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291073#comment-15291073
 ] 

ASF GitHub Bot commented on FLINK-1502:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1947#issuecomment-220319335
  
Let me grab the token for this one. There are a few things still, like 
resource leaks, in this code.
I'll pass you back the token as soon as I am done...


> Expose metrics to graphite, ganglia and JMX.
> 
>
> Key: FLINK-1502
> URL: https://issues.apache.org/jira/browse/FLINK-1502
> Project: Flink
>  Issue Type: Sub-task
>  Components: JobManager, TaskManager
>Affects Versions: 0.9
>Reporter: Robert Metzger
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: pre-apache
>
>
> The metrics library allows to expose collected metrics easily to other 
> systems such as graphite, ganglia or Java's JVM (VisualVM).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1502] [WIP] Basic Metric System

2016-05-19 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1947#issuecomment-220319335
  
Let me grab the token for this one. There are a few things still, like 
resource leaks, in this code.
I'll pass you back the token as soon as I am done...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2220) Join on Pojo without hashCode() silently fails

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291048#comment-15291048
 ] 

ASF GitHub Bot commented on FLINK-2220:
---

Github user gallenvara closed the pull request at:

https://github.com/apache/flink/pull/1940


> Join on Pojo without hashCode() silently fails
> --
>
> Key: FLINK-2220
> URL: https://issues.apache.org/jira/browse/FLINK-2220
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9, 0.8.1
>Reporter: Marcus Leich
>
> I need to perform a join using a complete Pojo as join key.
> With DOP > 1 this only works if the Pojo comes with a meaningful hasCode() 
> implementation, as otherwise equal objects will get hashed to different 
> partitions based on their memory address and not on the content.
> I guess it's fine if users are required to implement hasCode() themselves, 
> but it would be nice of documentation or better yet, Flink itself could alert 
> users that this is a requirement, similar to how Comparable is required for 
> keys.
> Use the following code to reproduce the issue:
> public class Pojo implements Comparable {
> public byte[] data;
> public Pojo () {
> }
> public Pojo (byte[] data) {
> this.data = data;
> }
> @Override
> public int compareTo(Pojo o) {
> return UnsignedBytes.lexicographicalComparator().compare(data, 
> o.data);
> }
> // uncomment me for making the join work
> /* @Override
> public int hashCode() {
> return Arrays.hashCode(data);
> }*/
> }
> public void testJoin () throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(4);
> DataSet> left = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Spark"),
> new Tuple2<>(new Pojo(new byte[] {2}), "good"),
> new Tuple2<>(new Pojo(new byte[] {5}), "bug"));
> DataSet> right = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), 
> "green"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Flink"),
> new Tuple2<>(new Pojo(new byte[] {2}), "evil"),
> new Tuple2<>(new Pojo(new byte[] {5}), "fix"));
> // will not print anything unless Pojo has a real hashCode() 
> implementation
> 
> left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print();
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2220) Join on Pojo without hashCode() silently fails

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291045#comment-15291045
 ] 

ASF GitHub Bot commented on FLINK-2220:
---

Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1940#issuecomment-220316125
  
@fhueske i can't reproduce the issue in another computer. And I'm not sure 
if it's because I forgot the default constructor (no parameter) that led to the 
problem existing. I sent email to the reporter and not responsed. I will close 
the PR. Thanks.


> Join on Pojo without hashCode() silently fails
> --
>
> Key: FLINK-2220
> URL: https://issues.apache.org/jira/browse/FLINK-2220
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9, 0.8.1
>Reporter: Marcus Leich
>
> I need to perform a join using a complete Pojo as join key.
> With DOP > 1 this only works if the Pojo comes with a meaningful hasCode() 
> implementation, as otherwise equal objects will get hashed to different 
> partitions based on their memory address and not on the content.
> I guess it's fine if users are required to implement hasCode() themselves, 
> but it would be nice of documentation or better yet, Flink itself could alert 
> users that this is a requirement, similar to how Comparable is required for 
> keys.
> Use the following code to reproduce the issue:
> public class Pojo implements Comparable {
> public byte[] data;
> public Pojo () {
> }
> public Pojo (byte[] data) {
> this.data = data;
> }
> @Override
> public int compareTo(Pojo o) {
> return UnsignedBytes.lexicographicalComparator().compare(data, 
> o.data);
> }
> // uncomment me for making the join work
> /* @Override
> public int hashCode() {
> return Arrays.hashCode(data);
> }*/
> }
> public void testJoin () throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(4);
> DataSet> left = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Spark"),
> new Tuple2<>(new Pojo(new byte[] {2}), "good"),
> new Tuple2<>(new Pojo(new byte[] {5}), "bug"));
> DataSet> right = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), 
> "green"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Flink"),
> new Tuple2<>(new Pojo(new byte[] {2}), "evil"),
> new Tuple2<>(new Pojo(new byte[] {5}), "fix"));
> // will not print anything unless Pojo has a real hashCode() 
> implementation
> 
> left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print();
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2220] Join on Pojo without hashCode() s...

2016-05-19 Thread gallenvara
Github user gallenvara commented on the pull request:

https://github.com/apache/flink/pull/1940#issuecomment-220316125
  
@fhueske i can't reproduce the issue in another computer. And I'm not sure 
if it's because I forgot the default constructor (no parameter) that led to the 
problem existing. I sent email to the reporter and not responsed. I will close 
the PR. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291038#comment-15291038
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r63871737
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private String keySpace = "flink_auxiliary";
+   private String table = "checkpoints_";
+
+   private transient PreparedStatement deleteStatement;
+   private transient PreparedStatement updateStatement;
+   private transient PreparedStatement selectStatement;
+
+   public CassandraCommitter(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+   this(builder);
+   this.keySpace = keySpace;
+   }
+
+   /**
+* Internally used to set the job ID after instantiation.
+*
+* @param id
+* @throws Exception
+*/
+   public void setJobId(String id) throws Exception {
+   super.setJobId(id);
+   table += id;
+   }
+
+   /**
+* Generates the necessary tables to store information.
+*
+* @return
+* @throws Exception
+*/
+   @Override
+   public void createResource() throws Exception {
+   cluster = builder.getCluster();
+   session = cluster.connect();
+
+   session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':3};", 
keySpace));
+   session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", keySpace, table));
+
+   try {
+   session.close();
+   } catch (Exception e) {
+   LOG.error("Error while closing session.", e);
--- End diff --

correct.


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should support TupleX types, and map standard 
> java types to the respective cassandra types.
> In 

[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-05-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r63871737
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private String keySpace = "flink_auxiliary";
+   private String table = "checkpoints_";
+
+   private transient PreparedStatement deleteStatement;
+   private transient PreparedStatement updateStatement;
+   private transient PreparedStatement selectStatement;
+
+   public CassandraCommitter(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+   this(builder);
+   this.keySpace = keySpace;
+   }
+
+   /**
+* Internally used to set the job ID after instantiation.
+*
+* @param id
+* @throws Exception
+*/
+   public void setJobId(String id) throws Exception {
+   super.setJobId(id);
+   table += id;
+   }
+
+   /**
+* Generates the necessary tables to store information.
+*
+* @return
+* @throws Exception
+*/
+   @Override
+   public void createResource() throws Exception {
+   cluster = builder.getCluster();
+   session = cluster.connect();
+
+   session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':3};", 
keySpace));
+   session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", keySpace, table));
+
+   try {
+   session.close();
+   } catch (Exception e) {
+   LOG.error("Error while closing session.", e);
--- End diff --

correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3933) Add an auto-type-extracting DeserializationSchema

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291024#comment-15291024
 ] 

ASF GitHub Bot commented on FLINK-3933:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2010#discussion_r63869803
  
--- Diff: docs/apis/streaming/connectors/kafka.md ---
@@ -142,18 +142,24 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers 
from the machine submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, 
the client log might contain information about failed requests, etc.
 
-# The `DeserializationSchema`
+# **The `DeserializationSchema`**
--- End diff --

`kafka.md` uses very low level headings. The batch and streaming guides use 
level 2 headings for top-level headings while `kafka.md` uses level 4. I think 
the solution is to reduce heading level by 2 on all levels in this document, 
which would yield
```
### The `DeserializationSchema`
```


> Add an auto-type-extracting DeserializationSchema
> -
>
> Key: FLINK-3933
> URL: https://issues.apache.org/jira/browse/FLINK-3933
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> When creating a {{DeserializationSchema}}, people need to manually worry 
> about how to provide the produced type's {{TypeInformation}}.
> We should add a base utility {{AbstractDeserializationSchema}} that provides 
> that automatically via the type extractor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3933] [streaming API] Add AbstractDeser...

2016-05-19 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2010#discussion_r63869803
  
--- Diff: docs/apis/streaming/connectors/kafka.md ---
@@ -142,18 +142,24 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers 
from the machine submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, 
the client log might contain information about failed requests, etc.
 
-# The `DeserializationSchema`
+# **The `DeserializationSchema`**
--- End diff --

`kafka.md` uses very low level headings. The batch and streaming guides use 
level 2 headings for top-level headings while `kafka.md` uses level 4. I think 
the solution is to reduce heading level by 2 on all levels in this document, 
which would yield
```
### The `DeserializationSchema`
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2220] Join on Pojo without hashCode() s...

2016-05-19 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1940#issuecomment-220310633
  
Any update for this PR, @gallenvara?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2220) Join on Pojo without hashCode() silently fails

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291021#comment-15291021
 ] 

ASF GitHub Bot commented on FLINK-2220:
---

Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/1940#issuecomment-220310633
  
Any update for this PR, @gallenvara?


> Join on Pojo without hashCode() silently fails
> --
>
> Key: FLINK-2220
> URL: https://issues.apache.org/jira/browse/FLINK-2220
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 0.9, 0.8.1
>Reporter: Marcus Leich
>
> I need to perform a join using a complete Pojo as join key.
> With DOP > 1 this only works if the Pojo comes with a meaningful hasCode() 
> implementation, as otherwise equal objects will get hashed to different 
> partitions based on their memory address and not on the content.
> I guess it's fine if users are required to implement hasCode() themselves, 
> but it would be nice of documentation or better yet, Flink itself could alert 
> users that this is a requirement, similar to how Comparable is required for 
> keys.
> Use the following code to reproduce the issue:
> public class Pojo implements Comparable {
> public byte[] data;
> public Pojo () {
> }
> public Pojo (byte[] data) {
> this.data = data;
> }
> @Override
> public int compareTo(Pojo o) {
> return UnsignedBytes.lexicographicalComparator().compare(data, 
> o.data);
> }
> // uncomment me for making the join work
> /* @Override
> public int hashCode() {
> return Arrays.hashCode(data);
> }*/
> }
> public void testJoin () throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.createLocalEnvironment();
> env.setParallelism(4);
> DataSet> left = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Spark"),
> new Tuple2<>(new Pojo(new byte[] {2}), "good"),
> new Tuple2<>(new Pojo(new byte[] {5}), "bug"));
> DataSet> right = env.fromElements(
> new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"),
> new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), 
> "green"),
> new Tuple2<>(new Pojo(new byte[] {1}), "Flink"),
> new Tuple2<>(new Pojo(new byte[] {2}), "evil"),
> new Tuple2<>(new Pojo(new byte[] {5}), "fix"));
> // will not print anything unless Pojo has a real hashCode() 
> implementation
> 
> left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print();
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...

2016-05-19 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r63869055
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private String keySpace = "flink_auxiliary";
+   private String table = "checkpoints_";
+
+   private transient PreparedStatement deleteStatement;
+   private transient PreparedStatement updateStatement;
+   private transient PreparedStatement selectStatement;
+
+   public CassandraCommitter(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+   this(builder);
+   this.keySpace = keySpace;
+   }
+
+   /**
+* Internally used to set the job ID after instantiation.
+*
+* @param id
+* @throws Exception
+*/
+   public void setJobId(String id) throws Exception {
+   super.setJobId(id);
+   table += id;
+   }
+
+   /**
+* Generates the necessary tables to store information.
+*
+* @return
+* @throws Exception
+*/
+   @Override
+   public void createResource() throws Exception {
+   cluster = builder.getCluster();
+   session = cluster.connect();
+
+   session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':3};", 
keySpace));
+   session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", keySpace, table));
+
+   try {
+   session.close();
+   } catch (Exception e) {
+   LOG.error("Error while closing session.", e);
--- End diff --

This means that a failed closing operation cannot leave the external system 
in a corrupted state?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291020#comment-15291020
 ] 

ASF GitHub Bot commented on FLINK-3311:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/1771#discussion_r63869055
  
--- Diff: 
flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java
 ---
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
+
+/**
+ * CheckpointCommitter that saves information about completed checkpoints 
within a separate table in a cassandra
+ * database.
+ * 
+ * Entries are in the form |operator_id | subtask_id | 
last_completed_checkpoint|
+ */
+public class CassandraCommitter extends CheckpointCommitter {
+   private ClusterBuilder builder;
+   private transient Cluster cluster;
+   private transient Session session;
+
+   private String keySpace = "flink_auxiliary";
+   private String table = "checkpoints_";
+
+   private transient PreparedStatement deleteStatement;
+   private transient PreparedStatement updateStatement;
+   private transient PreparedStatement selectStatement;
+
+   public CassandraCommitter(ClusterBuilder builder) {
+   this.builder = builder;
+   ClosureCleaner.clean(builder, true);
+   }
+
+   public CassandraCommitter(ClusterBuilder builder, String keySpace) {
+   this(builder);
+   this.keySpace = keySpace;
+   }
+
+   /**
+* Internally used to set the job ID after instantiation.
+*
+* @param id
+* @throws Exception
+*/
+   public void setJobId(String id) throws Exception {
+   super.setJobId(id);
+   table += id;
+   }
+
+   /**
+* Generates the necessary tables to store information.
+*
+* @return
+* @throws Exception
+*/
+   @Override
+   public void createResource() throws Exception {
+   cluster = builder.getCluster();
+   session = cluster.connect();
+
+   session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s 
with replication={'class':'SimpleStrategy', 'replication_factor':3};", 
keySpace));
+   session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s 
(sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, 
sub_id));", keySpace, table));
+
+   try {
+   session.close();
+   } catch (Exception e) {
+   LOG.error("Error while closing session.", e);
--- End diff --

This means that a failed closing operation cannot leave the external system 
in a corrupted state?


> Add a connector for streaming data into Cassandra
> -
>
> Key: FLINK-3311
> URL: https://issues.apache.org/jira/browse/FLINK-3311
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Robert Metzger
>Assignee: Andrea Sella
>
> We had users in the past asking for a Flink+Cassandra integration.
> It seems that there is a well-developed java client for connecting into 
> Cassandra: https://github.com/datastax/java-driver (ASL 2.0)
> There are also tutorials out there on how to start a local cassandra instance 
> (for the tests): 
> http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html
> For the data types, I think we should 

[jira] [Commented] (FLINK-3933) Add an auto-type-extracting DeserializationSchema

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291017#comment-15291017
 ] 

ASF GitHub Bot commented on FLINK-3933:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2010#discussion_r63868527
  
--- Diff: docs/apis/streaming/connectors/kafka.md ---
@@ -142,18 +142,24 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers 
from the machine submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, 
the client log might contain information about failed requests, etc.
 
-# The `DeserializationSchema`
+# **The `DeserializationSchema`**
--- End diff --

The whole doc uses pretty low weight formatting. It is hard to recognize a 
heading as such. I overlooked the section initially.


> Add an auto-type-extracting DeserializationSchema
> -
>
> Key: FLINK-3933
> URL: https://issues.apache.org/jira/browse/FLINK-3933
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.0.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.1.0
>
>
> When creating a {{DeserializationSchema}}, people need to manually worry 
> about how to provide the produced type's {{TypeInformation}}.
> We should add a base utility {{AbstractDeserializationSchema}} that provides 
> that automatically via the type extractor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3933] [streaming API] Add AbstractDeser...

2016-05-19 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/2010#discussion_r63868527
  
--- Diff: docs/apis/streaming/connectors/kafka.md ---
@@ -142,18 +142,24 @@ for querying the list of topics and partitions.
 For this to work, the consumer needs to be able to access the consumers 
from the machine submitting the job to the Flink cluster.
 If you experience any issues with the Kafka consumer on the client side, 
the client log might contain information about failed requests, etc.
 
-# The `DeserializationSchema`
+# **The `DeserializationSchema`**
--- End diff --

The whole doc uses pretty low weight formatting. It is hard to recognize a 
heading as such. I overlooked the section initially.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams

2016-05-19 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291006#comment-15291006
 ] 

Gabor Gevay commented on FLINK-2147:


> From a first look, something like StreamGroupedFold would be enough right?

Sorry, I'm not sure. I suggest you ask on the mailing list, and then probably 
someone who knows streaming better than me will respond. Unfortunately I don't 
have enough time now to delve deep into this.

By the way, maybe you could start with this Jira: 
https://issues.apache.org/jira/browse/FLINK-2144
There are some similarities to this one, but it is more straightforward to 
implement.

> Approximate calculation of frequencies in data streams
> --
>
> Key: FLINK-2147
> URL: https://issues.apache.org/jira/browse/FLINK-2147
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Reporter: Gabor Gevay
>  Labels: approximate, statistics
>
> Count-Min sketch is a hashing-based algorithm for approximately keeping track 
> of the frequencies of elements in a data stream. It is described by Cormode 
> et al. in the following paper:
> http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf
> Note that this algorithm can be conveniently implemented in a distributed 
> way, as described in section 3.2 of the paper.
> The paper
> http://www.vldb.org/conf/2002/S10P03.pdf
> also describes algorithms for approximately keeping track of frequencies, but 
> here the user can specify a threshold below which she is not interested in 
> the frequency of an element. The error-bounds are also different than the 
> Count-min sketch algorithm.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3909) Maven Failsafe plugin may report SUCCESS on failed tests

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291005#comment-15291005
 ] 

ASF GitHub Bot commented on FLINK-3909:
---

Github user markreddy commented on the pull request:

https://github.com/apache/flink/pull/2003#issuecomment-220307660
  
I took a look at the root issue of upgrading to 2.19.1. I'm able to 
reliably reproduce this issue on my laptop. 

When I bump the version I get the same as @tillrohrmann 
`java.lang.NoSuchMethodError:

org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/curator/framework/CuratorFramework`

When running the build in debug mode for both 2.18.1 and 2.19.1 I observed 
some differences in the classpath. 

**2.18.1**
`[DEBUG] boot(compact) classpath:  surefire-booter-2.18.1.jar  
surefire-api-2.18.1.jar  test-classes  classes  
flink-core-1.1-SNAPSHOT.jar.`

**2.19.1**
`[DEBUG] boot(compact) classpath:  surefire-booter-2.19.1.jar  
surefire-api-2.19.1.jar  test-classes  flink-runtime_2.10-1.1-SNAPSHOT.jar  
flink-core-1.1-SNAPSHOT.jar.`

So 2.19.1 has changed behaviour. What is loaded onto the classpath differs, 
in 2.19.1 instead of loading target/classes it loads the actual built jar. 
https://issues.apache.org/jira/browse/SUREFIRE-855 confirms this. 

The issue with this is the final jar has curator shaded while the test 
classes are looking for the unshaded version of curator, as shown by the debug 
output:
`Lorg/apache/curator/framework/CuratorFramework`

If @mxm can get everything working on a lower version thats the easiest 
solution. If not or we want to proceed with moving up in version, at least we 
know the root of the issue and can work from there




> Maven Failsafe plugin may report SUCCESS on failed tests
> 
>
> Key: FLINK-3909
> URL: https://issues.apache.org/jira/browse/FLINK-3909
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.1.0
>Reporter: Maximilian Michels
>Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
> The following build completed successfully on Travis but there are actually 
> test failures: https://travis-ci.org/apache/flink/jobs/129943398#L5402



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3909] update Maven Failsafe version

2016-05-19 Thread markreddy
Github user markreddy commented on the pull request:

https://github.com/apache/flink/pull/2003#issuecomment-220307660
  
I took a look at the root issue of upgrading to 2.19.1. I'm able to 
reliably reproduce this issue on my laptop. 

When I bump the version I get the same as @tillrohrmann 
`java.lang.NoSuchMethodError:

org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/curator/framework/CuratorFramework`

When running the build in debug mode for both 2.18.1 and 2.19.1 I observed 
some differences in the classpath. 

**2.18.1**
`[DEBUG] boot(compact) classpath:  surefire-booter-2.18.1.jar  
surefire-api-2.18.1.jar  test-classes  classes  
flink-core-1.1-SNAPSHOT.jar.`

**2.19.1**
`[DEBUG] boot(compact) classpath:  surefire-booter-2.19.1.jar  
surefire-api-2.19.1.jar  test-classes  flink-runtime_2.10-1.1-SNAPSHOT.jar  
flink-core-1.1-SNAPSHOT.jar.`

So 2.19.1 has changed behaviour. What is loaded onto the classpath differs, 
in 2.19.1 instead of loading target/classes it loads the actual built jar. 
https://issues.apache.org/jira/browse/SUREFIRE-855 confirms this. 

The issue with this is the final jar has curator shaded while the test 
classes are looking for the unshaded version of curator, as shown by the debug 
output:
`Lorg/apache/curator/framework/CuratorFramework`

If @mxm can get everything working on a lower version thats the easiest 
solution. If not or we want to proceed with moving up in version, at least we 
know the root of the issue and can work from there




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3935) Invalid check of key and ordering fields in PartitionNode

2016-05-19 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3935:


 Summary: Invalid check of key and ordering fields in PartitionNode
 Key: FLINK-3935
 URL: https://issues.apache.org/jira/browse/FLINK-3935
 Project: Flink
  Issue Type: Bug
  Components: Optimizer
Affects Versions: 1.1.0
Reporter: Fabian Hueske
Assignee: Fabian Hueske
 Fix For: 1.1.0


{{PartitionNode}} checks for range partitioning that partition keys and the 
fields of the ordering are identical. However the check is not correctly 
implemented because {{PartitionNode}} holds the partition keys as an unordered 
{{FieldSet}} which is compared against an ordered {{FieldList}} provided by the 
{{Ordering}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3934) Prevent translation of non-equi joins in DataSetJoinRule

2016-05-19 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-3934:


 Summary: Prevent translation of non-equi joins in DataSetJoinRule
 Key: FLINK-3934
 URL: https://issues.apache.org/jira/browse/FLINK-3934
 Project: Flink
  Issue Type: Bug
  Components: Table API
Affects Versions: 1.1.0
Reporter: Fabian Hueske
Assignee: Fabian Hueske


At the moment, also non-equi joins are translated into {{DataSetJoin}}s. 
To prevent such plans from being picked, we assign huge costs and eventually 
fail their translation into DataSet programs.

A better solution is to prevent a non-equi join from being translated into a 
{{DataSetJoin}} in the {{DataSetJoinRule}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3761) Introduce key group state backend

2016-05-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290909#comment-15290909
 ] 

ASF GitHub Bot commented on FLINK-3761:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/1988#issuecomment-220293946
  
Yeah, we were also wondering wether it would make sense to allow the state 
itself to be repartitioned, i.e. union and then split into the new parallelism. 
In this way we wouldn't read all state in every operator.


> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: state backends
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >