[jira] [Updated] (FLINK-3222) Incorrect shift amount in OperatorCheckpointStats#hashCode()
[ https://issues.apache.org/jira/browse/FLINK-3222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3222: -- Description: Here is related code: {code} result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length >>> 32)); {code} subTaskStats.length is an int. The shift amount is greater than 31 bits. was: Here is related code: {code} result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length >>> 32)); {code} subTaskStats.length is an int. The shift amount is greater than 31 bits. > Incorrect shift amount in OperatorCheckpointStats#hashCode() > > > Key: FLINK-3222 > URL: https://issues.apache.org/jira/browse/FLINK-3222 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > Here is related code: > {code} > result = 31 * result + (int) (subTaskStats.length ^ (subTaskStats.length > >>> 32)); > {code} > subTaskStats.length is an int. > The shift amount is greater than 31 bits. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3734) Unclosed DataInputView in AbstractAlignedProcessingTimeWindowOperator#restoreState()
[ https://issues.apache.org/jira/browse/FLINK-3734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3734: -- Description: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanespanes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. was: {code} DataInputView in = inputState.getState(getUserCodeClassloader()); final long nextEvaluationTime = in.readLong(); final long nextSlideTime = in.readLong(); AbstractKeyedTimePanes panes = createPanes(keySelector, function); panes.readFromInput(in, keySerializer, stateTypeSerializer); restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } {code} DataInputView in is not closed upon return. > Unclosed DataInputView in > AbstractAlignedProcessingTimeWindowOperator#restoreState() > > > Key: FLINK-3734 > URL: https://issues.apache.org/jira/browse/FLINK-3734 > Project: Flink > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > DataInputView in = inputState.getState(getUserCodeClassloader()); > final long nextEvaluationTime = in.readLong(); > final long nextSlideTime = in.readLong(); > AbstractKeyedTimePanes panes = > createPanes(keySelector, function); > panes.readFromInput(in, keySerializer, stateTypeSerializer); > restoredState = new RestoredState<>(panes, nextEvaluationTime, > nextSlideTime); > } > {code} > DataInputView in is not closed upon return. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3939) Prevent distinct aggregates and grouping sets from being translated
Fabian Hueske created FLINK-3939: Summary: Prevent distinct aggregates and grouping sets from being translated Key: FLINK-3939 URL: https://issues.apache.org/jira/browse/FLINK-3939 Project: Flink Issue Type: Bug Components: Table API Reporter: Fabian Hueske Assignee: Fabian Hueske Fix For: 1.1.0 Flink's SQL interface is currently not capable of executing distinct aggregates and grouping sets. We need to prevent that queries with these operations are translated by adapting the DataSetAggregateRule. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3801) Upgrade Joda-Time library to 2.9.3
[ https://issues.apache.org/jira/browse/FLINK-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-3801: -- Description: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 was: Currently yoda-time 2.5 is used which was very old. We should upgrade to 2.9.3 > Upgrade Joda-Time library to 2.9.3 > -- > > Key: FLINK-3801 > URL: https://issues.apache.org/jira/browse/FLINK-3801 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > > Currently yoda-time 2.5 is used which was very old. > We should upgrade to 2.9.3 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [script] Simple fix of typo from JobManager to...
Github user hsaputra commented on the pull request: https://github.com/apache/flink/pull/2001#issuecomment-220454866 Thanks @StephanEwen , have merged the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15292153#comment-15292153 ] ASF GitHub Bot commented on FLINK-3780: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63956013 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { --- End diff -- I don't know that 64 is the best value but I think it is close. I clarified the javadoc to suggest that users not change this value. > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63956013 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { --- End diff -- I don't know that 64 is the best value but I think it is close. I clarified the javadoc to suggest that users not change this value. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63951105 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Jaccard Index. + * + * This example generates an undirected RMat graph with the given scale and + * edge factor then calculates all non-zero Jaccard Index similarity scores + * between vertices. + * + * @see org.apache.flink.graph.library.similarity.JaccardIndex + */ +public class JaccardIndex { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + Graphgraph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + DataSet js; + + if (scale > 32) { + js = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } else { + js = graph + .run(new TranslateGraphIds (new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } + + switch (parameters.get("output", "")) { --- End diff -- Is the usage statement sufficient? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15292103#comment-15292103 ] ASF GitHub Bot commented on FLINK-3780: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63951105 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Jaccard Index. + * + * This example generates an undirected RMat graph with the given scale and + * edge factor then calculates all non-zero Jaccard Index similarity scores + * between vertices. + * + * @see org.apache.flink.graph.library.similarity.JaccardIndex + */ +public class JaccardIndex { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + Graphgraph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + DataSet js; + + if (scale > 32) { + js = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } else { + js = graph + .run(new TranslateGraphIds (new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } + + switch (parameters.get("output", "")) { --- End diff -- Is the usage statement sufficient? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project:
[GitHub] flink pull request: [script] Simple fix of typo from JobManager to...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2001 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3935) Invalid check of key and ordering fields in PartitionNode
[ https://issues.apache.org/jira/browse/FLINK-3935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-3935. Resolution: Fixed Fixed with 853a5359eb76a987f3fee33065b34051c2fa6094 > Invalid check of key and ordering fields in PartitionNode > - > > Key: FLINK-3935 > URL: https://issues.apache.org/jira/browse/FLINK-3935 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > Fix For: 1.1.0 > > > {{PartitionNode}} checks for range partitioning that partition keys and the > fields of the ordering are identical. However the check is not correctly > implemented because {{PartitionNode}} holds the partition keys as an > unordered {{FieldSet}} which is compared against an ordered {{FieldList}} > provided by the {{Ordering}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3934) Prevent translation of non-equi joins in DataSetJoinRule
[ https://issues.apache.org/jira/browse/FLINK-3934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-3934. Resolution: Fixed Fix Version/s: 1.1.0 Fixed with 3d6ce294123d90ca7bb029d4041f29a1ae1ccd81 > Prevent translation of non-equi joins in DataSetJoinRule > > > Key: FLINK-3934 > URL: https://issues.apache.org/jira/browse/FLINK-3934 > Project: Flink > Issue Type: Bug > Components: Table API >Affects Versions: 1.1.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > Fix For: 1.1.0 > > > At the moment, also non-equi joins are translated into {{DataSetJoin}}s. > To prevent such plans from being picked, we assign huge costs and eventually > fail their translation into DataSet programs. > A better solution is to prevent a non-equi join from being translated into a > {{DataSetJoin}} in the {{DataSetJoinRule}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3836) Change Histogram to enable Long counters
[ https://issues.apache.org/jira/browse/FLINK-3836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291594#comment-15291594 ] ASF GitHub Bot commented on FLINK-3836: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1966#issuecomment-220395032 I think for now, we should add a `@Deprecated` annotation to the `Histogram` and its related method on `RuntimeContext` and mention in the comment that we encourage the `LongHistogram` instead. On a major release, we should go through all deprecated parts and remove them. > Change Histogram to enable Long counters > > > Key: FLINK-3836 > URL: https://issues.apache.org/jira/browse/FLINK-3836 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.0.2 >Reporter: Maximilian Bode >Assignee: Maximilian Bode >Priority: Minor > > Change > flink/flink-core/src/main/java/org/apache/flink/api/common/accumulators/Histogram.java > to enable Long counts instead of Integer. In particular, change the TreeMap > to be. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3836] Add LongHistogram accumulator
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1966#issuecomment-220395032 I think for now, we should add a `@Deprecated` annotation to the `Histogram` and its related method on `RuntimeContext` and mention in the comment that we encourage the `LongHistogram` instead. On a major release, we should go through all deprecated parts and remove them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3909) Maven Failsafe plugin may report SUCCESS on failed tests
[ https://issues.apache.org/jira/browse/FLINK-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291517#comment-15291517 ] ASF GitHub Bot commented on FLINK-3909: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/2003#issuecomment-220388275 I get segfaults from RocksDB in my Travis tests: https://travis-ci.org/mxm/flink/builds/131412162 Let's see how the builds here go. All in all, looks ready to be merged. > Maven Failsafe plugin may report SUCCESS on failed tests > > > Key: FLINK-3909 > URL: https://issues.apache.org/jira/browse/FLINK-3909 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > The following build completed successfully on Travis but there are actually > test failures: https://travis-ci.org/apache/flink/jobs/129943398#L5402 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3909] update Maven Failsafe version
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/2003#issuecomment-220388275 I get segfaults from RocksDB in my Travis tests: https://travis-ci.org/mxm/flink/builds/131412162 Let's see how the builds here go. All in all, looks ready to be merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3927) TaskManager registration may fail if Yarn versions don't match
[ https://issues.apache.org/jira/browse/FLINK-3927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291484#comment-15291484 ] ASF GitHub Bot commented on FLINK-3927: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2013 [FLINK-3927][yarn] make container id consistent across Hadoop versions Fixes a bug where the container id generation would vary across Hadoop versions of the client/cluster. The ResourceManager assumes a persistent resource id. Based on #2012, to re-enable the Yarn tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3927 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2013.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2013 commit 422e078c93b558dba3d0c6a53643824198e2c545 Author: Maximilian MichelsDate: 2016-05-19T12:29:12Z [FLINK-3927][yarn] make container id consistent across Hadoop versions - introduce a unique container id independent of the Hadoop version - improve printing of exceptions during registration - minor improvements to the Yarn ResourceManager code commit c27fc8553f4dc0fbcee09c52848477cff2de0b11 Author: Maximilian Michels Date: 2016-05-19T15:59:23Z [FLINK-3938] re-enable Yarn tests As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not executed anymore. They were moved to the test directory but there was still a Maven configuration in place to change the test directory. > TaskManager registration may fail if Yarn versions don't match > -- > > Key: FLINK-3927 > URL: https://issues.apache.org/jira/browse/FLINK-3927 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > Flink's ResourceManager uses the Yarn container ids to identify connecting > task managers. Yarn's stringified container id may not be consistent across > different Hadoop versions, e.g. Hadoop 2.3.0 and Hadoop 2.7.1. The > ResourceManager gets it from the Yarn reports while the TaskManager infers it > from the Yarn environment variables. The ResourceManager may use Hadoop 2.3.0 > version while the cluster runs Hadoop 2.7.1. > The solution is to pass the ID through a custom environment variable which is > set by the ResourceManager before launching the TaskManager in the container. > That way we will always use the Hadoop client's id generation method. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3927][yarn] make container id consisten...
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2013 [FLINK-3927][yarn] make container id consistent across Hadoop versions Fixes a bug where the container id generation would vary across Hadoop versions of the client/cluster. The ResourceManager assumes a persistent resource id. Based on #2012, to re-enable the Yarn tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3927 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2013.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2013 commit 422e078c93b558dba3d0c6a53643824198e2c545 Author: Maximilian MichelsDate: 2016-05-19T12:29:12Z [FLINK-3927][yarn] make container id consistent across Hadoop versions - introduce a unique container id independent of the Hadoop version - improve printing of exceptions during registration - minor improvements to the Yarn ResourceManager code commit c27fc8553f4dc0fbcee09c52848477cff2de0b11 Author: Maximilian Michels Date: 2016-05-19T15:59:23Z [FLINK-3938] re-enable Yarn tests As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not executed anymore. They were moved to the test directory but there was still a Maven configuration in place to change the test directory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291431#comment-15291431 ] ASF GitHub Bot commented on FLINK-3190: --- Github user fijolekProjects closed the pull request at: https://github.com/apache/flink/pull/1954 > Retry rate limits for DataStream API > > > Key: FLINK-3190 > URL: https://issues.apache.org/jira/browse/FLINK-3190 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Michał Fijołek >Priority: Minor > > For a long running stream processing job, absolute numbers of retries don't > make much sense: The job will accumulate transient errors over time and will > die eventually when thresholds are exceeded. Rate limits are better suited in > this scenario: A job should only die, if it fails too often in a given time > frame. To better overcome transient errors, retry delays could be used, as > suggested in other issues. > Absolute numbers of retries can still make sense, if failing operators don't > make any progress at all. We can measure progress by OperatorState changes > and by observing output, as long as the operator in question is not a sink. > If operator state changes and/or operator produces output, we can assume it > makes progress. > As an example, let's say we configured a retry rate limit of 10 retries per > hour and a non-sink operator A. If the operator fails once every 10 minutes > and produces output between failures, it should not lead to job termination. > But if the operator fails 11 times in an hour or does not produce output > between 11 consecutive failures, job should be terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3190) Retry rate limits for DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291432#comment-15291432 ] ASF GitHub Bot commented on FLINK-3190: --- GitHub user fijolekProjects reopened a pull request: https://github.com/apache/flink/pull/1954 [FLINK-3190] failure rate restart strategy Failure rate restart strategy - job should only die, if it fails too often in a given time frame You can merge this pull request into a Git repository by running: $ git pull https://github.com/fijolekProjects/flink FLINK-3190 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1954.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1954 commit 1c78e8cbee8cc842daac3bd72891dbf7a515bf21 Author: Michal FijolekDate: 2016-03-13T00:40:15Z [FLINK-3190] failure rate restart strategy > Retry rate limits for DataStream API > > > Key: FLINK-3190 > URL: https://issues.apache.org/jira/browse/FLINK-3190 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Assignee: Michał Fijołek >Priority: Minor > > For a long running stream processing job, absolute numbers of retries don't > make much sense: The job will accumulate transient errors over time and will > die eventually when thresholds are exceeded. Rate limits are better suited in > this scenario: A job should only die, if it fails too often in a given time > frame. To better overcome transient errors, retry delays could be used, as > suggested in other issues. > Absolute numbers of retries can still make sense, if failing operators don't > make any progress at all. We can measure progress by OperatorState changes > and by observing output, as long as the operator in question is not a sink. > If operator state changes and/or operator produces output, we can assume it > makes progress. > As an example, let's say we configured a retry rate limit of 10 retries per > hour and a non-sink operator A. If the operator fails once every 10 minutes > and produces output between failures, it should not lead to job termination. > But if the operator fails 11 times in an hour or does not produce output > between 11 consecutive failures, job should be terminated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy
GitHub user fijolekProjects reopened a pull request: https://github.com/apache/flink/pull/1954 [FLINK-3190] failure rate restart strategy Failure rate restart strategy - job should only die, if it fails too often in a given time frame You can merge this pull request into a Git repository by running: $ git pull https://github.com/fijolekProjects/flink FLINK-3190 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1954.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1954 commit 1c78e8cbee8cc842daac3bd72891dbf7a515bf21 Author: Michal FijolekDate: 2016-03-13T00:40:15Z [FLINK-3190] failure rate restart strategy --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3190] failure rate restart strategy
Github user fijolekProjects closed the pull request at: https://github.com/apache/flink/pull/1954 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3938) Yarn tests don't run on the current master
[ https://issues.apache.org/jira/browse/FLINK-3938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291368#comment-15291368 ] ASF GitHub Bot commented on FLINK-3938: --- GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2012 [FLINK-3938] re-enable Yarn tests Fixes a regression of #1915. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3938 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2012 commit a2708f1c2e2f59a92f39d32a5e302b7f57267678 Author: Maximilian MichelsDate: 2016-05-19T15:59:23Z [FLINK-3938] re-enable Yarn tests As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not executed anymore. They were moved to the test directory but there was still a Maven configuration in place to change the test directory. > Yarn tests don't run on the current master > -- > > Key: FLINK-3938 > URL: https://issues.apache.org/jira/browse/FLINK-3938 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels >Priority: Critical > Fix For: 1.1.0 > > > Independently of FLINK-3909, I just discovered that the Yarn tests don't run > on the current master (09b428b). > {noformat} > [INFO] > > [INFO] Building flink-yarn-tests 1.1-SNAPSHOT > [INFO] > > [INFO] > [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ > flink-yarn-tests_2.10 --- > [INFO] > [INFO] --- maven-checkstyle-plugin:2.16:check (validate) @ > flink-yarn-tests_2.10 --- > [INFO] > [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-maven) @ > flink-yarn-tests_2.10 --- > [INFO] > [INFO] --- build-helper-maven-plugin:1.7:add-source (add-source) @ > flink-yarn-tests_2.10 --- > [INFO] Source directory: > /home/travis/build/apache/flink/flink-yarn-tests/src/main/scala added. > [INFO] > [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ > flink-yarn-tests_2.10 --- > [INFO] > [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ > flink-yarn-tests_2.10 --- > [INFO] Using 'UTF-8' encoding to copy filtered resources. > [INFO] skip non existing resourceDirectory > /home/travis/build/apache/flink/flink-yarn-tests/src/main/resources > [INFO] Copying 3 resources > [INFO] > [INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ > flink-yarn-tests_2.10 --- > [INFO] No sources to compile > [INFO] > [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ > flink-yarn-tests_2.10 --- > [INFO] No sources to compile > [INFO] > [INFO] --- build-helper-maven-plugin:1.7:add-test-source (add-test-source) @ > flink-yarn-tests_2.10 --- > [INFO] Test Source directory: > /home/travis/build/apache/flink/flink-yarn-tests/src/test/scala added. > [INFO] > [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ > flink-yarn-tests_2.10 --- > [INFO] Using 'UTF-8' encoding to copy filtered resources. > [INFO] Copying 1 resource > [INFO] Copying 3 resources > [INFO] > [INFO] --- scala-maven-plugin:3.1.4:testCompile (scala-test-compile) @ > flink-yarn-tests_2.10 --- > [INFO] /home/travis/build/apache/flink/flink-yarn-tests/src/test/scala:-1: > info: compiling > [INFO] Compiling 2 source files to > /home/travis/build/apache/flink/flink-yarn-tests/target/test-classes at > 1463615798796 > [INFO] prepare-compile in 0 s > [INFO] compile in 9 s > [INFO] > [INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ > flink-yarn-tests_2.10 --- > [INFO] Nothing to compile - all classes are up to date > [INFO] > [INFO] --- maven-surefire-plugin:2.18.1:test (default-test) @ > flink-yarn-tests_2.10 --- > [INFO] Surefire report directory: > /home/travis/build/apache/flink/flink-yarn-tests/target/surefire-reports > [WARNING] The system property log4j.configuration is configured twice! The > property appears in and any of , > or user property. > --- > T E S T S > --- > Results : > Tests run: 0, Failures: 0, Errors: 0, Skipped: 0 > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3938] re-enable Yarn tests
GitHub user mxm opened a pull request: https://github.com/apache/flink/pull/2012 [FLINK-3938] re-enable Yarn tests Fixes a regression of #1915. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mxm/flink FLINK-3938 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2012.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2012 commit a2708f1c2e2f59a92f39d32a5e302b7f57267678 Author: Maximilian MichelsDate: 2016-05-19T15:59:23Z [FLINK-3938] re-enable Yarn tests As of 70978f560fa5cab6d84ec27d58faa2627babd362, the Yarn tests were not executed anymore. They were moved to the test directory but there was still a Maven configuration in place to change the test directory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3932) Implement State Backend Security
[ https://issues.apache.org/jira/browse/FLINK-3932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright updated FLINK-3932: Description: _This issue is part of a series of improvements detailed in the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ Flink should protect its HA, checkpoint, and savepoint state against unauthorized access. As described in the design doc, implement: - ZooKeeper authentication w/ Kerberos - ZooKeeper authorization (i.e. znode ACLs) - Checkpoint/savepoint data protection was: _This issue is part of a series of improvements detailed the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ Flink should protect its HA, checkpoint, and savepoint state against unauthorized access. As described in the design doc, implement: - ZooKeeper authentication w/ Kerberos - ZooKeeper authorization (i.e. znode ACLs) - Checkpoint/savepoint data protection > Implement State Backend Security > > > Key: FLINK-3932 > URL: https://issues.apache.org/jira/browse/FLINK-3932 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright > Labels: security > Original Estimate: 336h > Remaining Estimate: 336h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Flink should protect its HA, checkpoint, and savepoint state against > unauthorized access. > As described in the design doc, implement: > - ZooKeeper authentication w/ Kerberos > - ZooKeeper authorization (i.e. znode ACLs) > - Checkpoint/savepoint data protection -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3931) Implement Transport Encryption (SSL/TLS)
[ https://issues.apache.org/jira/browse/FLINK-3931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright updated FLINK-3931: Description: _This issue is part of a series of improvements detailed in the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ To assure privacy and data integrity between Flink components, enable TLS for all communication channels. As described in the design doc: - Accept a configured certificate or generate a certificate. - Enable Akka SSL - Implement Data Transfer SSL - Implement Blob Server SSL - Implement Web UI HTTPS was: _This issue is part of a series of improvements detailed the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ To assure privacy and data integrity between Flink components, enable TLS for all communication channels. As described in the design doc: - Accept a configured certificate or generate a certificate. - Enable Akka SSL - Implement Data Transfer SSL - Implement Blob Server SSL - Implement Web UI HTTPS > Implement Transport Encryption (SSL/TLS) > > > Key: FLINK-3931 > URL: https://issues.apache.org/jira/browse/FLINK-3931 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright > Labels: security > Original Estimate: 1,008h > Remaining Estimate: 1,008h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > To assure privacy and data integrity between Flink components, enable TLS for > all communication channels. As described in the design doc: > - Accept a configured certificate or generate a certificate. > - Enable Akka SSL > - Implement Data Transfer SSL > - Implement Blob Server SSL > - Implement Web UI HTTPS -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3930) Implement Service-Level Authorization
[ https://issues.apache.org/jira/browse/FLINK-3930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright updated FLINK-3930: Description: _This issue is part of a series of improvements detailed in the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ Service-level authorization is the initial authorization mechanism to ensure clients (or servers) connecting to the Flink cluster are authorized to do so. The purpose is to prevent a cluster from being used by an unauthorized user, whether to execute jobs, disrupt cluster functionality, or gain access to secrets stored within the cluster. Implement service-level authorization as described in the design doc. - Introduce a shared secret cookie - Enable Akka security cookie - Implement data transfer authentication - Secure the web dashboard was: _This issue is part of a series of improvements detailed the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ Service-level authorization is the initial authorization mechanism to ensure clients (or servers) connecting to the Flink cluster are authorized to do so. The purpose is to prevent a cluster from being used by an unauthorized user, whether to execute jobs, disrupt cluster functionality, or gain access to secrets stored within the cluster. Implement service-level authorization as described in the design doc. - Introduce a shared secret cookie - Enable Akka security cookie - Implement data transfer authentication - Secure the web dashboard > Implement Service-Level Authorization > - > > Key: FLINK-3930 > URL: https://issues.apache.org/jira/browse/FLINK-3930 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright > Labels: security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Service-level authorization is the initial authorization mechanism to ensure > clients (or servers) connecting to the Flink cluster are authorized to do so. > The purpose is to prevent a cluster from being used by an unauthorized > user, whether to execute jobs, disrupt cluster functionality, or gain access > to secrets stored within the cluster. > Implement service-level authorization as described in the design doc. > - Introduce a shared secret cookie > - Enable Akka security cookie > - Implement data transfer authentication > - Secure the web dashboard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Eron Wright updated FLINK-3929: Description: _This issue is part of a series of improvements detailed in the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ Add support for a keytab credential to be associated with the Flink cluster, to facilitate: - Kerberos-authenticated data access for connectors - Kerberos-authenticated ZooKeeper access Support both the standalone and YARN deployment modes. was: _This issue is part of a series of improvements detailed the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc._ Add support for a keytab credential to be associated with the Flink cluster, to facilitate: - Kerberos-authenticated data access for connectors - Kerberos-authenticated ZooKeeper access Support both the standalone and YARN deployment modes. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291351#comment-15291351 ] Flavio Pompermaier commented on FLINK-3922: --- I think that if you can run the code I attached successfully then you can close it :) However, maybe I'd add a unit test for it... > Infinite recursion on TypeExtractor > --- > > Key: FLINK-3922 > URL: https://issues.apache.org/jira/browse/FLINK-3922 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Timo Walther >Priority: Critical > > This program cause a StackOverflow (infinite recursion) in the TypeExtractor: > {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid} > public class TypeSerializerStackOverflowOnRecursivePojo { > public static class RecursivePojoimplements Serializable { > private static final long serialVersionUID = 1L; > > private RecursivePojo parent; > public RecursivePojo(){} > public RecursivePojo(K k, V v) { > } > public RecursivePojo getParent() { > return parent; > } > public void setParent(RecursivePojo parent) { > this.parent = parent; > } > > } > public static class TypedTuple extends Tuple3 RecursivePojo >>{ > private static final long serialVersionUID = 1L; > } > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(Arrays.asList(new RecursivePojo Map >("test",new HashMap ( > .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", > "1", t);return ret;}).returns(TypedTuple.class) > .print(); > } > > } > {code} > The thrown Exception is the following: > {code:title=Exception thrown} > Exception in thread "main" java.lang.StackOverflowError > at > sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328) > at > sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283) > at > sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485) > at > sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627) > at > sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577) > at > sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43) > at > sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74) > at > sun.reflect.generics.repository.GenericDeclRepository.(GenericDeclRepository.java:49) > at > sun.reflect.generics.repository.ConstructorRepository.(ConstructorRepository.java:51) > at > sun.reflect.generics.repository.MethodRepository.(MethodRepository.java:46) > at > sun.reflect.generics.repository.MethodRepository.make(MethodRepository.java:59) > at java.lang.reflect.Method.getGenericInfo(Method.java:102) > at java.lang.reflect.Method.getGenericReturnType(Method.java:255) > at > org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1610) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1671) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) > at >
[jira] [Created] (FLINK-3938) Yarn tests don't run on the current master
Maximilian Michels created FLINK-3938: - Summary: Yarn tests don't run on the current master Key: FLINK-3938 URL: https://issues.apache.org/jira/browse/FLINK-3938 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.1.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Priority: Critical Fix For: 1.1.0 Independently of FLINK-3909, I just discovered that the Yarn tests don't run on the current master (09b428b). {noformat} [INFO] [INFO] Building flink-yarn-tests 1.1-SNAPSHOT [INFO] [INFO] [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ flink-yarn-tests_2.10 --- [INFO] [INFO] --- maven-checkstyle-plugin:2.16:check (validate) @ flink-yarn-tests_2.10 --- [INFO] [INFO] --- maven-enforcer-plugin:1.3.1:enforce (enforce-maven) @ flink-yarn-tests_2.10 --- [INFO] [INFO] --- build-helper-maven-plugin:1.7:add-source (add-source) @ flink-yarn-tests_2.10 --- [INFO] Source directory: /home/travis/build/apache/flink/flink-yarn-tests/src/main/scala added. [INFO] [INFO] --- maven-remote-resources-plugin:1.5:process (default) @ flink-yarn-tests_2.10 --- [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ flink-yarn-tests_2.10 --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] skip non existing resourceDirectory /home/travis/build/apache/flink/flink-yarn-tests/src/main/resources [INFO] Copying 3 resources [INFO] [INFO] --- scala-maven-plugin:3.1.4:compile (scala-compile-first) @ flink-yarn-tests_2.10 --- [INFO] No sources to compile [INFO] [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ flink-yarn-tests_2.10 --- [INFO] No sources to compile [INFO] [INFO] --- build-helper-maven-plugin:1.7:add-test-source (add-test-source) @ flink-yarn-tests_2.10 --- [INFO] Test Source directory: /home/travis/build/apache/flink/flink-yarn-tests/src/test/scala added. [INFO] [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ flink-yarn-tests_2.10 --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 1 resource [INFO] Copying 3 resources [INFO] [INFO] --- scala-maven-plugin:3.1.4:testCompile (scala-test-compile) @ flink-yarn-tests_2.10 --- [INFO] /home/travis/build/apache/flink/flink-yarn-tests/src/test/scala:-1: info: compiling [INFO] Compiling 2 source files to /home/travis/build/apache/flink/flink-yarn-tests/target/test-classes at 1463615798796 [INFO] prepare-compile in 0 s [INFO] compile in 9 s [INFO] [INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ flink-yarn-tests_2.10 --- [INFO] Nothing to compile - all classes are up to date [INFO] [INFO] --- maven-surefire-plugin:2.18.1:test (default-test) @ flink-yarn-tests_2.10 --- [INFO] Surefire report directory: /home/travis/build/apache/flink/flink-yarn-tests/target/surefire-reports [WARNING] The system property log4j.configuration is configured twice! The property appears in and any of , or user property. --- T E S T S --- Results : Tests run: 0, Failures: 0, Errors: 0, Skipped: 0 {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291312#comment-15291312 ] ASF GitHub Bot commented on FLINK-3780: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63901325 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) {
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63901325 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) { + // the intersection includes every vertex + assertEquals(completeGraphVertexCount, result.getDistinctNeighborCount().getValue()); + + // the union only excludes the
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291289#comment-15291289 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63899403 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) {
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63899403 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) { + // the intersection includes every vertex + assertEquals(completeGraphVertexCount, result.getDistinctNeighborCount().getValue()); + + // the union only excludes the two
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291281#comment-15291281 ] ASF GitHub Bot commented on FLINK-3780: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63898835 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) {
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63898835 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) { + // the intersection includes every vertex + assertEquals(completeGraphVertexCount, result.getDistinctNeighborCount().getValue()); + + // the union only excludes the
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291259#comment-15291259 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63896459 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) {
[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291260#comment-15291260 ] Timo Walther commented on FLINK-3922: - [~f.pompermaier] I have opened a PR. Can you confirm that it solves your problem? > Infinite recursion on TypeExtractor > --- > > Key: FLINK-3922 > URL: https://issues.apache.org/jira/browse/FLINK-3922 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Timo Walther >Priority: Critical > > This program cause a StackOverflow (infinite recursion) in the TypeExtractor: > {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid} > public class TypeSerializerStackOverflowOnRecursivePojo { > public static class RecursivePojoimplements Serializable { > private static final long serialVersionUID = 1L; > > private RecursivePojo parent; > public RecursivePojo(){} > public RecursivePojo(K k, V v) { > } > public RecursivePojo getParent() { > return parent; > } > public void setParent(RecursivePojo parent) { > this.parent = parent; > } > > } > public static class TypedTuple extends Tuple3 RecursivePojo >>{ > private static final long serialVersionUID = 1L; > } > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(Arrays.asList(new RecursivePojo Map >("test",new HashMap ( > .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", > "1", t);return ret;}).returns(TypedTuple.class) > .print(); > } > > } > {code} > The thrown Exception is the following: > {code:title=Exception thrown} > Exception in thread "main" java.lang.StackOverflowError > at > sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328) > at > sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283) > at > sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485) > at > sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627) > at > sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577) > at > sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43) > at > sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74) > at > sun.reflect.generics.repository.GenericDeclRepository.(GenericDeclRepository.java:49) > at > sun.reflect.generics.repository.ConstructorRepository.(ConstructorRepository.java:51) > at > sun.reflect.generics.repository.MethodRepository.(MethodRepository.java:46) > at > sun.reflect.generics.repository.MethodRepository.make(MethodRepository.java:59) > at java.lang.reflect.Method.getGenericInfo(Method.java:102) > at java.lang.reflect.Method.getGenericReturnType(Method.java:255) > at > org.apache.flink.api.java.typeutils.TypeExtractor.isValidPojoField(TypeExtractor.java:1610) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1671) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1559) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:732) > at > org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1678) > at >
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63896459 --- Diff: flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java --- @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.Utils.ChecksumHashCode; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.AsmTestBase; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.test.util.TestBaseUtils; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class JaccardIndexTest +extends AsmTestBase { + + @Test + public void testSimpleGraph() + throws Exception { + DataSetji = undirectedSimpleGraph + .run(new JaccardIndex ()); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMinimumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMinimumScore(1, 2)); + + String expectedResult = + "(0,3,(2,4))\n" + + "(1,2,(2,4))\n" + + "(4,5,(1,1))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testSimpleGraphWithMaximumScore() + throws Exception { + DataSet ji = undirectedSimpleGraph + .run(new JaccardIndex () + .setMaximumScore(1, 2)); + + String expectedResult = + "(0,1,(1,4))\n" + + "(0,2,(1,4))\n" + + "(1,3,(1,6))\n" + + "(1,4,(1,3))\n" + + "(1,5,(1,3))\n" + + "(2,3,(1,6))\n" + + "(2,4,(1,3))\n" + + "(2,5,(1,3))\n"; + + TestBaseUtils.compareResultAsText(ji.collect(), expectedResult); + } + + @Test + public void testCompleteGraph() + throws Exception { + DataSet ji = completeGraph + .run(new JaccardIndex () + .setGroupSize(4)); + + for (Result result : ji.collect()) { + // the intersection includes every vertex + assertEquals(completeGraphVertexCount, result.getDistinctNeighborCount().getValue()); + + // the union only excludes the two
[jira] [Commented] (FLINK-3922) Infinite recursion on TypeExtractor
[ https://issues.apache.org/jira/browse/FLINK-3922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291251#comment-15291251 ] ASF GitHub Bot commented on FLINK-3922: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2011 [FLINK-3922] [types] Infinite recursion on TypeExtractor - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed Fixes a special bug if a parameterized Pojo contains a recursive field of same type. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-3922 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2011.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2011 commit 91b988a240f7a71528ea16c3ffb5d605e0d920de Author: twalthrDate: 2016-05-19T15:01:57Z [FLINK-3922] [types] Infinite recursion on TypeExtractor > Infinite recursion on TypeExtractor > --- > > Key: FLINK-3922 > URL: https://issues.apache.org/jira/browse/FLINK-3922 > Project: Flink > Issue Type: Bug > Components: Type Serialization System >Affects Versions: 1.0.2 >Reporter: Flavio Pompermaier >Assignee: Timo Walther >Priority: Critical > > This program cause a StackOverflow (infinite recursion) in the TypeExtractor: > {code:title=TypeSerializerStackOverflowOnRecursivePojo.java|borderStyle=solid} > public class TypeSerializerStackOverflowOnRecursivePojo { > public static class RecursivePojo implements Serializable { > private static final long serialVersionUID = 1L; > > private RecursivePojo parent; > public RecursivePojo(){} > public RecursivePojo(K k, V v) { > } > public RecursivePojo getParent() { > return parent; > } > public void setParent(RecursivePojo parent) { > this.parent = parent; > } > > } > public static class TypedTuple extends Tuple3 RecursivePojo >>{ > private static final long serialVersionUID = 1L; > } > > public static void main(String[] args) throws Exception { > ExecutionEnvironment env = > ExecutionEnvironment.getExecutionEnvironment(); > env.fromCollection(Arrays.asList(new RecursivePojo Map >("test",new HashMap ( > .map(t-> {TypedTuple ret = new TypedTuple();ret.setFields("1", > "1", t);return ret;}).returns(TypedTuple.class) > .print(); > } > > } > {code} > The thrown Exception is the following: > {code:title=Exception thrown} > Exception in thread "main" java.lang.StackOverflowError > at > sun.reflect.generics.parser.SignatureParser.parsePackageNameAndSimpleClassTypeSignature(SignatureParser.java:328) > at > sun.reflect.generics.parser.SignatureParser.parseClassTypeSignature(SignatureParser.java:310) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:289) > at > sun.reflect.generics.parser.SignatureParser.parseFieldTypeSignature(SignatureParser.java:283) > at > sun.reflect.generics.parser.SignatureParser.parseTypeSignature(SignatureParser.java:485) > at > sun.reflect.generics.parser.SignatureParser.parseReturnType(SignatureParser.java:627) > at > sun.reflect.generics.parser.SignatureParser.parseMethodTypeSignature(SignatureParser.java:577) > at > sun.reflect.generics.parser.SignatureParser.parseMethodSig(SignatureParser.java:171) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:55) > at > sun.reflect.generics.repository.ConstructorRepository.parse(ConstructorRepository.java:43) > at > sun.reflect.generics.repository.AbstractRepository.(AbstractRepository.java:74) > at >
[GitHub] flink pull request: [FLINK-3922] [types] Infinite recursion on Typ...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2011 [FLINK-3922] [types] Infinite recursion on TypeExtractor - [x] General - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [x] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [x] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed Fixes a special bug if a parameterized Pojo contains a recursive field of same type. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-3922 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2011.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2011 commit 91b988a240f7a71528ea16c3ffb5d605e0d920de Author: twalthrDate: 2016-05-19T15:01:57Z [FLINK-3922] [types] Infinite recursion on TypeExtractor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291243#comment-15291243 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63895157 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >=
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63895157 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >= 0, "Minimum score numerator must be non-negative"); + Preconditions.checkArgument(denominator > 0, "Minimum score denominator must be greater than zero"); + Preconditions.checkArgument(numerator <=
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291232#comment-15291232 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894471 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >=
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894471 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >= 0, "Minimum score numerator must be non-negative"); + Preconditions.checkArgument(denominator > 0, "Minimum score denominator must be greater than zero"); + Preconditions.checkArgument(numerator <=
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291228#comment-15291228 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894348 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >=
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894348 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { + Preconditions.checkArgument(numerator >= 0, "Minimum score numerator must be non-negative"); + Preconditions.checkArgument(denominator > 0, "Minimum score denominator must be greater than zero"); + Preconditions.checkArgument(numerator <=
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894239 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { --- End diff -- This should be documented in the library usage docs too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291227#comment-15291227 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894239 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { + Preconditions.checkArgument(groupSize > 0, "Group size must be greater than zero"); + + this.groupSize = groupSize; + + return this; + } + + /** +* Filter out Jaccard Index scores less than the given minimum fraction. +* +* @param numerator numerator of the minimum score +* @param denominator denominator of the minimum score +* @return this +* @see #setMaximumScore(int, int) +*/ + public JaccardIndex setMinimumScore(int numerator, int denominator) { --- End diff -- This should be documented in
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894069 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { --- End diff -- Could you maybe extend the description of what groupSize does? And it would be nice to add documentation for this method in the library docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291225#comment-15291225 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63894069 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java --- @@ -0,0 +1,462 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.library.similarity; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.operators.Order; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.functions.FunctionAnnotation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree; +import org.apache.flink.graph.library.similarity.JaccardIndex.Result; +import org.apache.flink.graph.utils.Murmur3_32; +import org.apache.flink.types.CopyableValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.util.Collector; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * The Jaccard Index measures the similarity between vertex neighborhoods. + * Scores range from 0.0 (no common neighbors) to 1.0 (all neighbors are common). + * + * This implementation produces similarity scores for each pair of vertices + * in the graph with at least one common neighbor; equivalently, this is the + * set of all non-zero Jaccard Similarity coefficients. + * + * The input graph must be a simple, undirected graph containing no duplicate + * edges or self-loops. + * + * @param graph ID type + * @param vertex value type + * @param edge value type + */ +public class JaccardIndex, VV, EV> +implements GraphAlgorithm> { + + public static final int DEFAULT_GROUP_SIZE = 64; + + // Optional configuration + private int groupSize = DEFAULT_GROUP_SIZE; + + private boolean unboundedScores = true; + + private int minimumScoreNumerator = 0; + + private int minimumScoreDenominator = 1; + + private int maximumScoreNumerator = 1; + + private int maximumScoreDenominator = 0; + + private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN; + + /** +* Override the default group size for the quadratic expansion of neighbor +* pairs. Small groups generate more data whereas large groups distribute +* computation less evenly among tasks. +* +* @param groupSize the group size for the quadratic expansion of neighbor pairs +* @return this +*/ + public JaccardIndex setGroupSize(int groupSize) { --- End diff -- Could you maybe extend the description of what groupSize does? And it would be nice to add documentation for this method in the library docs. > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291205#comment-15291205 ] ASF GitHub Bot commented on FLINK-3780: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63892035 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges --- End diff -- I've also seen triad to mean any three vertices and triplet to mean any three connected vertices. I'll rework this section. > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63893564 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of +neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. + + Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, +the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and --- End diff -- Ah great! Can you add this here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291219#comment-15291219 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63893564 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of +neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. + + Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, +the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and --- End diff -- Ah great! Can you add this here? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63893365 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of +neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. + + Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, +the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and --- End diff -- It does, from `Result.getJaccardIndexScore()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291217#comment-15291217 ] ASF GitHub Bot commented on FLINK-3780: --- Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63893365 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of +neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. + + Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, +the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and --- End diff -- It does, from `Result.getJaccardIndexScore()`. > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291216#comment-15291216 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63893177 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.asm.translate.TranslateFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate the edge degree returned by the degree annotation functions from + * {@link LongValue} to {@link IntValue}. + * + * @param edge value type + */ +public class TranslateEdgeDegreeToIntValue --- End diff -- This is a general translator from `LongValue` to `IntValue`, not just for edge degrees, right? Maybe we could rename it to demonstrate that? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63893177 --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/TranslateEdgeDegreeToIntValue.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.asm.degree.annotate; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.asm.translate.TranslateFunction; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; + +/** + * Translate the edge degree returned by the degree annotation functions from + * {@link LongValue} to {@link IntValue}. + * + * @param edge value type + */ +public class TranslateEdgeDegreeToIntValue --- End diff -- This is a general translator from `LongValue` to `IntValue`, not just for edge degrees, right? Maybe we could rename it to demonstrate that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
[ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sebastian Klemke updated FLINK-3937: Attachment: improve_flink_cli_yarn_integration.patch Patch that implements -yid option. Applies cleanly to git rev c71675f > Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters > -- > > Key: FLINK-3937 > URL: https://issues.apache.org/jira/browse/FLINK-3937 > Project: Flink > Issue Type: Improvement >Reporter: Sebastian Klemke >Priority: Trivial > Attachments: improve_flink_cli_yarn_integration.patch > > > Currently, flink cli can't figure out JobManager RPC location for > Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop > subcommands are hard to invoke if you only know the YARN application ID. As > an improvement, I suggest adding a -yid option to the > mentioned subcommands that can be used together with -m yarn-cluster. Flink > cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291213#comment-15291213 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63892449 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Jaccard Index. + * + * This example generates an undirected RMat graph with the given scale and + * edge factor then calculates all non-zero Jaccard Index similarity scores + * between vertices. + * + * @see org.apache.flink.graph.library.similarity.JaccardIndex + */ +public class JaccardIndex { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + Graphgraph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + DataSet js; + + if (scale > 32) { + js = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } else { + js = graph + .run(new TranslateGraphIds (new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } + + switch (parameters.get("output", "")) { --- End diff -- Can you add a description for these output parameter options in the class Javadoc? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL:
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63892449 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Jaccard Index. + * + * This example generates an undirected RMat graph with the given scale and + * edge factor then calculates all non-zero Jaccard Index similarity scores + * between vertices. + * + * @see org.apache.flink.graph.library.similarity.JaccardIndex + */ +public class JaccardIndex { + + public static final int DEFAULT_SCALE = 10; + + public static final int DEFAULT_EDGE_FACTOR = 16; + + public static final boolean DEFAULT_CLIP_AND_FLIP = true; + + public static void main(String[] args) throws Exception { + // Set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.getConfig().enableObjectReuse(); + + ParameterTool parameters = ParameterTool.fromArgs(args); + + // Generate RMat graph + int scale = parameters.getInt("scale", DEFAULT_SCALE); + int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR); + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + long vertexCount = 1L << scale; + long edgeCount = vertexCount * edgeFactor; + + boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP); + + Graphgraph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .setSimpleGraph(true, clipAndFlip) + .generate(); + + DataSet js; + + if (scale > 32) { + js = graph + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } else { + js = graph + .run(new TranslateGraphIds (new LongValueToIntValue())) + .run(new org.apache.flink.graph.library.similarity.JaccardIndex ()); + } + + switch (parameters.get("output", "")) { --- End diff -- Can you add a description for these output parameter options in the class Javadoc? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
Sebastian Klemke created FLINK-3937: --- Summary: Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters Key: FLINK-3937 URL: https://issues.apache.org/jira/browse/FLINK-3937 Project: Flink Issue Type: Improvement Reporter: Sebastian Klemke Priority: Trivial Currently, flink cli can't figure out JobManager RPC location for Flink-on-YARN clusters. Therefore, list, savepoint, cancel and stop subcommands are hard to invoke if you only know the YARN application ID. As an improvement, I suggest adding a -yid option to the mentioned subcommands that can be used together with -m yarn-cluster. Flink cli would then retrieve JobManager RPC location from YARN ResourceManager. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291206#comment-15291206 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63892061 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Jaccard Index. + * + * This example generates an undirected RMat graph with the given scale and --- End diff -- The rest of the examples in Gelly (and all other Flink components) are designed so that they can be run both with user-specified inputs and without (default data). Could you please modify this example to work in a similar way? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user greghogan commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63892035 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges --- End diff -- I've also seen triad to mean any three vertices and triplet to mean any three connected vertices. I'll rework this section. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63892061 --- Diff: flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.examples; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.CsvOutputFormat; +import org.apache.flink.api.java.utils.DataSetUtils; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.asm.translate.LongValueToIntValue; +import org.apache.flink.graph.asm.translate.TranslateGraphIds; +import org.apache.flink.graph.generator.RMatGraph; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; + +import java.text.NumberFormat; + +/** + * Driver for the library implementation of Jaccard Index. + * + * This example generates an undirected RMat graph with the given scale and --- End diff -- The rest of the examples in Gelly (and all other Flink components) are designed so that they can be run both with user-specified inputs and without (default data). Could you please modify this example to work in a similar way? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291192#comment-15291192 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63890500 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges --- End diff -- By "two-paths" you mean triads? i.e. open triangles? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291197#comment-15291197 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63891097 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of +neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. + + Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, +the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and --- End diff -- Why doesn't the algorithm return the jaccard index? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63891097 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of +neighbors is emitted with the endpoint degree sum. Grouping on two-paths, the common neighbors are counted. + + Usage +The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs, +the number of common neighbors, and the number of distinct neighbors. The graph ID type must be `Comparable` and --- End diff -- Why doesn't the algorithm return the jaccard index? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291195#comment-15291195 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63890887 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of --- End diff -- Source, target or both endpoints? What is a "midpoint" vertex? Is this standard terminology? It might help to give a small example here. > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63890887 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed +by storing the sum of degrees of the vertex pair and subtracting the count of common neighbors, which are double-counted +in the sum of degrees. + +The algorithm first annotates each edge with the endpoint degree. Grouping on the midpoint vertex, each pair of --- End diff -- Source, target or both endpoints? What is a "midpoint" vertex? Is this standard terminology? It might help to give a small example here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3780) Jaccard Similarity
[ https://issues.apache.org/jira/browse/FLINK-3780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291193#comment-15291193 ] ASF GitHub Bot commented on FLINK-3780: --- Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63890630 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed --- End diff -- distinct _common_ neighbors? > Jaccard Similarity > -- > > Key: FLINK-3780 > URL: https://issues.apache.org/jira/browse/FLINK-3780 > Project: Flink > Issue Type: New Feature > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan > Fix For: 1.1.0 > > > Implement a Jaccard Similarity algorithm computing all non-zero similarity > scores. This algorithm is similar to {{TriangleListing}} but instead of > joining two-paths against an edge list we count two-paths. > {{flink-gelly-examples}} currently has {{JaccardSimilarityMeasure}} which > relies on {{Graph.getTriplets()}} so only computes similarity scores for > neighbors but not neighbors-of-neighbors. > This algorithm is easily modified for other similarity scores such as > Adamic-Adar similarity where the sum of endpoint degrees is replaced by the > degree of the middle vertex. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63890630 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges +connecting the two vertices to the common neighbor. The number of distinct neighbors for pairs of vertices is computed --- End diff -- distinct _common_ neighbors? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3780] [gelly] Jaccard Similarity
Github user vasia commented on a diff in the pull request: https://github.com/apache/flink/pull/1980#discussion_r63890500 --- Diff: docs/apis/batch/libs/gelly.md --- @@ -2051,6 +2052,26 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each vertex and edge in the output graph stores the common group value and the number of represented elements. +### Jaccard Index + + Overview +The Jaccard Index measures the similarity between vertex neighborhoods. Scores range from 0.0 (no common neighbors) to +1.0 (all neighbors are common). + + Details +Counting common neighbors for pairs of vertices is equivalent to counting the two-paths consisting of two edges --- End diff -- By "two-paths" you mean triads? i.e. open triangles? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Assigned] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vijay Srinivasaraghavan reassigned FLINK-3929: -- Assignee: Vijay Srinivasaraghavan (was: Eron Wright ) > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1745) Add exact k-nearest-neighbours algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291134#comment-15291134 ] ASF GitHub Bot commented on FLINK-1745: --- Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r63882747 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, DistanceMetric, +EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k`-nearest neighbor points in the training set for each point in the test set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.nn.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''5''') + * + * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]] + * Sets the distance metric we use to calculate the distance between two points. If no metric is + * specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. + * (Default value: '''EuclideanDistanceMetric()''') + * + * - [[org.apache.flink.ml.nn.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]] + * A boolean variable that whether or not to use a Quadtree to partition the training set + * to potentially simplify the KNN search. If no value is specified, the code will + * automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well + * with the number of training and testing points, though poorly with the dimension. + * (Default value: ```None```) + * + * - [[org.apache.flink.ml.nn.KNN.SizeHint]] + * Specifies whether the training set or test set is small to optimize the cross + * product operation needed for the KNN search. If the training set is small + * this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` + * if the test set is small. + * (Default value: ```None```) + * + */ + +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[FlinkVector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k > 0, "K
[GitHub] flink pull request: [FLINK-1745] Add exact k-nearest-neighbours al...
Github user danielblazevski commented on a diff in the pull request: https://github.com/apache/flink/pull/1220#discussion_r63882747 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.nn + +import org.apache.flink.api.common.operators.Order +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.scala.utils._ +import org.apache.flink.api.scala._ +import org.apache.flink.ml.common._ +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} +import org.apache.flink.ml.metrics.distances.{SquaredEuclideanDistanceMetric, DistanceMetric, +EuclideanDistanceMetric} +import org.apache.flink.ml.pipeline.{FitOperation, PredictDataSetOperation, Predictor} +import org.apache.flink.util.Collector +import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossHint + +import scala.collection.immutable.Vector +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +/** Implements a k-nearest neighbor join. + * + * Calculates the `k`-nearest neighbor points in the training set for each point in the test set. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = ... + * val testingDS: DataSet[Vector] = ... + * + * val knn = KNN() + * .setK(10) + * .setBlocks(5) + * .setDistanceMetric(EuclideanDistanceMetric()) + * + * knn.fit(trainingDS) + * + * val predictionDS: DataSet[(Vector, Array[Vector])] = knn.predict(testingDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.nn.KNN.K]] + * Sets the K which is the number of selected points as neighbors. (Default value: '''5''') + * + * - [[org.apache.flink.ml.nn.KNN.DistanceMetric]] + * Sets the distance metric we use to calculate the distance between two points. If no metric is + * specified, then [[org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric]] is used. + * (Default value: '''EuclideanDistanceMetric()''') + * + * - [[org.apache.flink.ml.nn.KNN.Blocks]] + * Sets the number of blocks into which the input data will be split. This number should be set + * at least to the degree of parallelism. If no value is specified, then the parallelism of the + * input [[DataSet]] is used as the number of blocks. (Default value: '''None''') + * + * - [[org.apache.flink.ml.nn.KNN.UseQuadTreeParam]] + * A boolean variable that whether or not to use a Quadtree to partition the training set + * to potentially simplify the KNN search. If no value is specified, the code will + * automatically decide whether or not to use a Quadtree. Use of a Quadtree scales well + * with the number of training and testing points, though poorly with the dimension. + * (Default value: ```None```) + * + * - [[org.apache.flink.ml.nn.KNN.SizeHint]] + * Specifies whether the training set or test set is small to optimize the cross + * product operation needed for the KNN search. If the training set is small + * this should be `CrossHint.FIRST_IS_SMALL` and set to `CrossHint.SECOND_IS_SMALL` + * if the test set is small. + * (Default value: ```None```) + * + */ + +class KNN extends Predictor[KNN] { + + import KNN._ + + var trainingSet: Option[DataSet[Block[FlinkVector]]] = None + + /** Sets K +* @param k the number of selected points as neighbors +*/ + def setK(k: Int): KNN = { +require(k > 0, "K must be positive.") +parameters.add(K, k) +this + } + + /** Sets the distance metric +* @param metric the distance metric to calculate distance between two points +*/ + def
[GitHub] flink pull request: [FLINK-3909] update Maven Failsafe version
Github user mxm commented on the pull request: https://github.com/apache/flink/pull/2003#issuecomment-220321817 Thanks for looking into the problem and providing a detailed explanation @markreddy. I think we're good to go with the newest version of the Surefire plugin. I verified the tests 1) fail when errors occur 2) are actually run (added some debug output) 3) the number of tests/ITCases remains unchanged. Just running one more time because I forgot to set the default of `reuseForks` to false for the integration tests. Reusing of forks doesn't work for some tests because of garbage collection problems (this was the same for the Failsafe plugin). Apparently almost all tests pass. So looking forward to merging this soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3909) Maven Failsafe plugin may report SUCCESS on failed tests
[ https://issues.apache.org/jira/browse/FLINK-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291081#comment-15291081 ] ASF GitHub Bot commented on FLINK-3909: --- Github user mxm commented on the pull request: https://github.com/apache/flink/pull/2003#issuecomment-220321817 Thanks for looking into the problem and providing a detailed explanation @markreddy. I think we're good to go with the newest version of the Surefire plugin. I verified the tests 1) fail when errors occur 2) are actually run (added some debug output) 3) the number of tests/ITCases remains unchanged. Just running one more time because I forgot to set the default of `reuseForks` to false for the integration tests. Reusing of forks doesn't work for some tests because of garbage collection problems (this was the same for the Failsafe plugin). Apparently almost all tests pass. So looking forward to merging this soon. > Maven Failsafe plugin may report SUCCESS on failed tests > > > Key: FLINK-3909 > URL: https://issues.apache.org/jira/browse/FLINK-3909 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > The following build completed successfully on Travis but there are actually > test failures: https://travis-ci.org/apache/flink/jobs/129943398#L5402 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3936) Add MIN / MAX aggregations function for BOOLEAN types
Fabian Hueske created FLINK-3936: Summary: Add MIN / MAX aggregations function for BOOLEAN types Key: FLINK-3936 URL: https://issues.apache.org/jira/browse/FLINK-3936 Project: Flink Issue Type: Improvement Components: Table API Affects Versions: 1.1.0 Reporter: Fabian Hueske Fix For: 1.1.0 When executing TPC-H Q4, I observed that Calcite generates a MIN aggregate on Boolean literals to translate a decorrelate subquery in an {{EXIST}} predicate. MIN and MAX aggregates on Boolean data types are currently not supported and should be added. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1502) Expose metrics to graphite, ganglia and JMX.
[ https://issues.apache.org/jira/browse/FLINK-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291073#comment-15291073 ] ASF GitHub Bot commented on FLINK-1502: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1947#issuecomment-220319335 Let me grab the token for this one. There are a few things still, like resource leaks, in this code. I'll pass you back the token as soon as I am done... > Expose metrics to graphite, ganglia and JMX. > > > Key: FLINK-1502 > URL: https://issues.apache.org/jira/browse/FLINK-1502 > Project: Flink > Issue Type: Sub-task > Components: JobManager, TaskManager >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Chesnay Schepler >Priority: Minor > Fix For: pre-apache > > > The metrics library allows to expose collected metrics easily to other > systems such as graphite, ganglia or Java's JVM (VisualVM). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1502] [WIP] Basic Metric System
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1947#issuecomment-220319335 Let me grab the token for this one. There are a few things still, like resource leaks, in this code. I'll pass you back the token as soon as I am done... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2220) Join on Pojo without hashCode() silently fails
[ https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291048#comment-15291048 ] ASF GitHub Bot commented on FLINK-2220: --- Github user gallenvara closed the pull request at: https://github.com/apache/flink/pull/1940 > Join on Pojo without hashCode() silently fails > -- > > Key: FLINK-2220 > URL: https://issues.apache.org/jira/browse/FLINK-2220 > Project: Flink > Issue Type: Bug >Affects Versions: 0.9, 0.8.1 >Reporter: Marcus Leich > > I need to perform a join using a complete Pojo as join key. > With DOP > 1 this only works if the Pojo comes with a meaningful hasCode() > implementation, as otherwise equal objects will get hashed to different > partitions based on their memory address and not on the content. > I guess it's fine if users are required to implement hasCode() themselves, > but it would be nice of documentation or better yet, Flink itself could alert > users that this is a requirement, similar to how Comparable is required for > keys. > Use the following code to reproduce the issue: > public class Pojo implements Comparable { > public byte[] data; > public Pojo () { > } > public Pojo (byte[] data) { > this.data = data; > } > @Override > public int compareTo(Pojo o) { > return UnsignedBytes.lexicographicalComparator().compare(data, > o.data); > } > // uncomment me for making the join work > /* @Override > public int hashCode() { > return Arrays.hashCode(data); > }*/ > } > public void testJoin () throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment(); > env.setParallelism(4); > DataSet> left = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"), > new Tuple2<>(new Pojo(new byte[] {1}), "Spark"), > new Tuple2<>(new Pojo(new byte[] {2}), "good"), > new Tuple2<>(new Pojo(new byte[] {5}), "bug")); > DataSet > right = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), > "green"), > new Tuple2<>(new Pojo(new byte[] {1}), "Flink"), > new Tuple2<>(new Pojo(new byte[] {2}), "evil"), > new Tuple2<>(new Pojo(new byte[] {5}), "fix")); > // will not print anything unless Pojo has a real hashCode() > implementation > > left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print(); > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2220) Join on Pojo without hashCode() silently fails
[ https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291045#comment-15291045 ] ASF GitHub Bot commented on FLINK-2220: --- Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1940#issuecomment-220316125 @fhueske i can't reproduce the issue in another computer. And I'm not sure if it's because I forgot the default constructor (no parameter) that led to the problem existing. I sent email to the reporter and not responsed. I will close the PR. Thanks. > Join on Pojo without hashCode() silently fails > -- > > Key: FLINK-2220 > URL: https://issues.apache.org/jira/browse/FLINK-2220 > Project: Flink > Issue Type: Bug >Affects Versions: 0.9, 0.8.1 >Reporter: Marcus Leich > > I need to perform a join using a complete Pojo as join key. > With DOP > 1 this only works if the Pojo comes with a meaningful hasCode() > implementation, as otherwise equal objects will get hashed to different > partitions based on their memory address and not on the content. > I guess it's fine if users are required to implement hasCode() themselves, > but it would be nice of documentation or better yet, Flink itself could alert > users that this is a requirement, similar to how Comparable is required for > keys. > Use the following code to reproduce the issue: > public class Pojo implements Comparable { > public byte[] data; > public Pojo () { > } > public Pojo (byte[] data) { > this.data = data; > } > @Override > public int compareTo(Pojo o) { > return UnsignedBytes.lexicographicalComparator().compare(data, > o.data); > } > // uncomment me for making the join work > /* @Override > public int hashCode() { > return Arrays.hashCode(data); > }*/ > } > public void testJoin () throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment(); > env.setParallelism(4); > DataSet> left = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"), > new Tuple2<>(new Pojo(new byte[] {1}), "Spark"), > new Tuple2<>(new Pojo(new byte[] {2}), "good"), > new Tuple2<>(new Pojo(new byte[] {5}), "bug")); > DataSet > right = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), > "green"), > new Tuple2<>(new Pojo(new byte[] {1}), "Flink"), > new Tuple2<>(new Pojo(new byte[] {2}), "evil"), > new Tuple2<>(new Pojo(new byte[] {5}), "fix")); > // will not print anything unless Pojo has a real hashCode() > implementation > > left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print(); > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2220] Join on Pojo without hashCode() s...
Github user gallenvara commented on the pull request: https://github.com/apache/flink/pull/1940#issuecomment-220316125 @fhueske i can't reproduce the issue in another computer. And I'm not sure if it's because I forgot the default constructor (no parameter) that led to the problem existing. I sent email to the reporter and not responsed. I will close the PR. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291038#comment-15291038 ] ASF GitHub Bot commented on FLINK-3311: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r63871737 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private String keySpace = "flink_auxiliary"; + private String table = "checkpoints_"; + + private transient PreparedStatement deleteStatement; + private transient PreparedStatement updateStatement; + private transient PreparedStatement selectStatement; + + public CassandraCommitter(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public CassandraCommitter(ClusterBuilder builder, String keySpace) { + this(builder); + this.keySpace = keySpace; + } + + /** +* Internally used to set the job ID after instantiation. +* +* @param id +* @throws Exception +*/ + public void setJobId(String id) throws Exception { + super.setJobId(id); + table += id; + } + + /** +* Generates the necessary tables to store information. +* +* @return +* @throws Exception +*/ + @Override + public void createResource() throws Exception { + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table)); + + try { + session.close(); + } catch (Exception e) { + LOG.error("Error while closing session.", e); --- End diff -- correct. > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should support TupleX types, and map standard > java types to the respective cassandra types. > In
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r63871737 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private String keySpace = "flink_auxiliary"; + private String table = "checkpoints_"; + + private transient PreparedStatement deleteStatement; + private transient PreparedStatement updateStatement; + private transient PreparedStatement selectStatement; + + public CassandraCommitter(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public CassandraCommitter(ClusterBuilder builder, String keySpace) { + this(builder); + this.keySpace = keySpace; + } + + /** +* Internally used to set the job ID after instantiation. +* +* @param id +* @throws Exception +*/ + public void setJobId(String id) throws Exception { + super.setJobId(id); + table += id; + } + + /** +* Generates the necessary tables to store information. +* +* @return +* @throws Exception +*/ + @Override + public void createResource() throws Exception { + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table)); + + try { + session.close(); + } catch (Exception e) { + LOG.error("Error while closing session.", e); --- End diff -- correct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3933) Add an auto-type-extracting DeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291024#comment-15291024 ] ASF GitHub Bot commented on FLINK-3933: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2010#discussion_r63869803 --- Diff: docs/apis/streaming/connectors/kafka.md --- @@ -142,18 +142,24 @@ for querying the list of topics and partitions. For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster. If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc. -# The `DeserializationSchema` +# **The `DeserializationSchema`** --- End diff -- `kafka.md` uses very low level headings. The batch and streaming guides use level 2 headings for top-level headings while `kafka.md` uses level 4. I think the solution is to reduce heading level by 2 on all levels in this document, which would yield ``` ### The `DeserializationSchema` ``` > Add an auto-type-extracting DeserializationSchema > - > > Key: FLINK-3933 > URL: https://issues.apache.org/jira/browse/FLINK-3933 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > When creating a {{DeserializationSchema}}, people need to manually worry > about how to provide the produced type's {{TypeInformation}}. > We should add a base utility {{AbstractDeserializationSchema}} that provides > that automatically via the type extractor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3933] [streaming API] Add AbstractDeser...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2010#discussion_r63869803 --- Diff: docs/apis/streaming/connectors/kafka.md --- @@ -142,18 +142,24 @@ for querying the list of topics and partitions. For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster. If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc. -# The `DeserializationSchema` +# **The `DeserializationSchema`** --- End diff -- `kafka.md` uses very low level headings. The batch and streaming guides use level 2 headings for top-level headings while `kafka.md` uses level 4. I think the solution is to reduce heading level by 2 on all levels in this document, which would yield ``` ### The `DeserializationSchema` ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2220] Join on Pojo without hashCode() s...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1940#issuecomment-220310633 Any update for this PR, @gallenvara? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2220) Join on Pojo without hashCode() silently fails
[ https://issues.apache.org/jira/browse/FLINK-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291021#comment-15291021 ] ASF GitHub Bot commented on FLINK-2220: --- Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/1940#issuecomment-220310633 Any update for this PR, @gallenvara? > Join on Pojo without hashCode() silently fails > -- > > Key: FLINK-2220 > URL: https://issues.apache.org/jira/browse/FLINK-2220 > Project: Flink > Issue Type: Bug >Affects Versions: 0.9, 0.8.1 >Reporter: Marcus Leich > > I need to perform a join using a complete Pojo as join key. > With DOP > 1 this only works if the Pojo comes with a meaningful hasCode() > implementation, as otherwise equal objects will get hashed to different > partitions based on their memory address and not on the content. > I guess it's fine if users are required to implement hasCode() themselves, > but it would be nice of documentation or better yet, Flink itself could alert > users that this is a requirement, similar to how Comparable is required for > keys. > Use the following code to reproduce the issue: > public class Pojo implements Comparable { > public byte[] data; > public Pojo () { > } > public Pojo (byte[] data) { > this.data = data; > } > @Override > public int compareTo(Pojo o) { > return UnsignedBytes.lexicographicalComparator().compare(data, > o.data); > } > // uncomment me for making the join work > /* @Override > public int hashCode() { > return Arrays.hashCode(data); > }*/ > } > public void testJoin () throws Exception { > final ExecutionEnvironment env = > ExecutionEnvironment.createLocalEnvironment(); > env.setParallelism(4); > DataSet> left = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "black"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), "red"), > new Tuple2<>(new Pojo(new byte[] {1}), "Spark"), > new Tuple2<>(new Pojo(new byte[] {2}), "good"), > new Tuple2<>(new Pojo(new byte[] {5}), "bug")); > DataSet > right = env.fromElements( > new Tuple2<>(new Pojo(new byte[] {0, 24, 23, 1, 3}), "white"), > new Tuple2<>(new Pojo(new byte[] {0, 14, 13, 14, 13}), > "green"), > new Tuple2<>(new Pojo(new byte[] {1}), "Flink"), > new Tuple2<>(new Pojo(new byte[] {2}), "evil"), > new Tuple2<>(new Pojo(new byte[] {5}), "fix")); > // will not print anything unless Pojo has a real hashCode() > implementation > > left.join(right).where(0).equalTo(0).projectFirst(1).projectSecond(1).print(); > } -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3311/FLINK-3332] Add Cassandra connecto...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r63869055 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private String keySpace = "flink_auxiliary"; + private String table = "checkpoints_"; + + private transient PreparedStatement deleteStatement; + private transient PreparedStatement updateStatement; + private transient PreparedStatement selectStatement; + + public CassandraCommitter(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public CassandraCommitter(ClusterBuilder builder, String keySpace) { + this(builder); + this.keySpace = keySpace; + } + + /** +* Internally used to set the job ID after instantiation. +* +* @param id +* @throws Exception +*/ + public void setJobId(String id) throws Exception { + super.setJobId(id); + table += id; + } + + /** +* Generates the necessary tables to store information. +* +* @return +* @throws Exception +*/ + @Override + public void createResource() throws Exception { + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table)); + + try { + session.close(); + } catch (Exception e) { + LOG.error("Error while closing session.", e); --- End diff -- This means that a failed closing operation cannot leave the external system in a corrupted state? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3311) Add a connector for streaming data into Cassandra
[ https://issues.apache.org/jira/browse/FLINK-3311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291020#comment-15291020 ] ASF GitHub Bot commented on FLINK-3311: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1771#discussion_r63869055 --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraCommitter.java --- @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.streaming.runtime.operators.CheckpointCommitter; + +/** + * CheckpointCommitter that saves information about completed checkpoints within a separate table in a cassandra + * database. + * + * Entries are in the form |operator_id | subtask_id | last_completed_checkpoint| + */ +public class CassandraCommitter extends CheckpointCommitter { + private ClusterBuilder builder; + private transient Cluster cluster; + private transient Session session; + + private String keySpace = "flink_auxiliary"; + private String table = "checkpoints_"; + + private transient PreparedStatement deleteStatement; + private transient PreparedStatement updateStatement; + private transient PreparedStatement selectStatement; + + public CassandraCommitter(ClusterBuilder builder) { + this.builder = builder; + ClosureCleaner.clean(builder, true); + } + + public CassandraCommitter(ClusterBuilder builder, String keySpace) { + this(builder); + this.keySpace = keySpace; + } + + /** +* Internally used to set the job ID after instantiation. +* +* @param id +* @throws Exception +*/ + public void setJobId(String id) throws Exception { + super.setJobId(id); + table += id; + } + + /** +* Generates the necessary tables to store information. +* +* @return +* @throws Exception +*/ + @Override + public void createResource() throws Exception { + cluster = builder.getCluster(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':3};", keySpace)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", keySpace, table)); + + try { + session.close(); + } catch (Exception e) { + LOG.error("Error while closing session.", e); --- End diff -- This means that a failed closing operation cannot leave the external system in a corrupted state? > Add a connector for streaming data into Cassandra > - > > Key: FLINK-3311 > URL: https://issues.apache.org/jira/browse/FLINK-3311 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Robert Metzger >Assignee: Andrea Sella > > We had users in the past asking for a Flink+Cassandra integration. > It seems that there is a well-developed java client for connecting into > Cassandra: https://github.com/datastax/java-driver (ASL 2.0) > There are also tutorials out there on how to start a local cassandra instance > (for the tests): > http://prettyprint.me/prettyprint.me/2010/02/14/running-cassandra-as-an-embedded-service/index.html > For the data types, I think we should
[jira] [Commented] (FLINK-3933) Add an auto-type-extracting DeserializationSchema
[ https://issues.apache.org/jira/browse/FLINK-3933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291017#comment-15291017 ] ASF GitHub Bot commented on FLINK-3933: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2010#discussion_r63868527 --- Diff: docs/apis/streaming/connectors/kafka.md --- @@ -142,18 +142,24 @@ for querying the list of topics and partitions. For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster. If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc. -# The `DeserializationSchema` +# **The `DeserializationSchema`** --- End diff -- The whole doc uses pretty low weight formatting. It is hard to recognize a heading as such. I overlooked the section initially. > Add an auto-type-extracting DeserializationSchema > - > > Key: FLINK-3933 > URL: https://issues.apache.org/jira/browse/FLINK-3933 > Project: Flink > Issue Type: Bug > Components: Streaming >Affects Versions: 1.0.0 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.1.0 > > > When creating a {{DeserializationSchema}}, people need to manually worry > about how to provide the produced type's {{TypeInformation}}. > We should add a base utility {{AbstractDeserializationSchema}} that provides > that automatically via the type extractor. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3933] [streaming API] Add AbstractDeser...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2010#discussion_r63868527 --- Diff: docs/apis/streaming/connectors/kafka.md --- @@ -142,18 +142,24 @@ for querying the list of topics and partitions. For this to work, the consumer needs to be able to access the consumers from the machine submitting the job to the Flink cluster. If you experience any issues with the Kafka consumer on the client side, the client log might contain information about failed requests, etc. -# The `DeserializationSchema` +# **The `DeserializationSchema`** --- End diff -- The whole doc uses pretty low weight formatting. It is hard to recognize a heading as such. I overlooked the section initially. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291006#comment-15291006 ] Gabor Gevay commented on FLINK-2147: > From a first look, something like StreamGroupedFold would be enough right? Sorry, I'm not sure. I suggest you ask on the mailing list, and then probably someone who knows streaming better than me will respond. Unfortunately I don't have enough time now to delve deep into this. By the way, maybe you could start with this Jira: https://issues.apache.org/jira/browse/FLINK-2144 There are some similarities to this one, but it is more straightforward to implement. > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: Streaming >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3909) Maven Failsafe plugin may report SUCCESS on failed tests
[ https://issues.apache.org/jira/browse/FLINK-3909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15291005#comment-15291005 ] ASF GitHub Bot commented on FLINK-3909: --- Github user markreddy commented on the pull request: https://github.com/apache/flink/pull/2003#issuecomment-220307660 I took a look at the root issue of upgrading to 2.19.1. I'm able to reliably reproduce this issue on my laptop. When I bump the version I get the same as @tillrohrmann `java.lang.NoSuchMethodError: org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/curator/framework/CuratorFramework` When running the build in debug mode for both 2.18.1 and 2.19.1 I observed some differences in the classpath. **2.18.1** `[DEBUG] boot(compact) classpath: surefire-booter-2.18.1.jar surefire-api-2.18.1.jar test-classes classes flink-core-1.1-SNAPSHOT.jar.` **2.19.1** `[DEBUG] boot(compact) classpath: surefire-booter-2.19.1.jar surefire-api-2.19.1.jar test-classes flink-runtime_2.10-1.1-SNAPSHOT.jar flink-core-1.1-SNAPSHOT.jar.` So 2.19.1 has changed behaviour. What is loaded onto the classpath differs, in 2.19.1 instead of loading target/classes it loads the actual built jar. https://issues.apache.org/jira/browse/SUREFIRE-855 confirms this. The issue with this is the final jar has curator shaded while the test classes are looking for the unshaded version of curator, as shown by the debug output: `Lorg/apache/curator/framework/CuratorFramework` If @mxm can get everything working on a lower version thats the easiest solution. If not or we want to proceed with moving up in version, at least we know the root of the issue and can work from there > Maven Failsafe plugin may report SUCCESS on failed tests > > > Key: FLINK-3909 > URL: https://issues.apache.org/jira/browse/FLINK-3909 > Project: Flink > Issue Type: Bug > Components: Build System >Affects Versions: 1.1.0 >Reporter: Maximilian Michels >Assignee: Maximilian Michels > Fix For: 1.1.0 > > > The following build completed successfully on Travis but there are actually > test failures: https://travis-ci.org/apache/flink/jobs/129943398#L5402 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3909] update Maven Failsafe version
Github user markreddy commented on the pull request: https://github.com/apache/flink/pull/2003#issuecomment-220307660 I took a look at the root issue of upgrading to 2.19.1. I'm able to reliably reproduce this issue on my laptop. When I bump the version I get the same as @tillrohrmann `java.lang.NoSuchMethodError: org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(Lorg/apache/flink/configuration/Configuration;)Lorg/apache/curator/framework/CuratorFramework` When running the build in debug mode for both 2.18.1 and 2.19.1 I observed some differences in the classpath. **2.18.1** `[DEBUG] boot(compact) classpath: surefire-booter-2.18.1.jar surefire-api-2.18.1.jar test-classes classes flink-core-1.1-SNAPSHOT.jar.` **2.19.1** `[DEBUG] boot(compact) classpath: surefire-booter-2.19.1.jar surefire-api-2.19.1.jar test-classes flink-runtime_2.10-1.1-SNAPSHOT.jar flink-core-1.1-SNAPSHOT.jar.` So 2.19.1 has changed behaviour. What is loaded onto the classpath differs, in 2.19.1 instead of loading target/classes it loads the actual built jar. https://issues.apache.org/jira/browse/SUREFIRE-855 confirms this. The issue with this is the final jar has curator shaded while the test classes are looking for the unshaded version of curator, as shown by the debug output: `Lorg/apache/curator/framework/CuratorFramework` If @mxm can get everything working on a lower version thats the easiest solution. If not or we want to proceed with moving up in version, at least we know the root of the issue and can work from there --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3935) Invalid check of key and ordering fields in PartitionNode
Fabian Hueske created FLINK-3935: Summary: Invalid check of key and ordering fields in PartitionNode Key: FLINK-3935 URL: https://issues.apache.org/jira/browse/FLINK-3935 Project: Flink Issue Type: Bug Components: Optimizer Affects Versions: 1.1.0 Reporter: Fabian Hueske Assignee: Fabian Hueske Fix For: 1.1.0 {{PartitionNode}} checks for range partitioning that partition keys and the fields of the ordering are identical. However the check is not correctly implemented because {{PartitionNode}} holds the partition keys as an unordered {{FieldSet}} which is compared against an ordered {{FieldList}} provided by the {{Ordering}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3934) Prevent translation of non-equi joins in DataSetJoinRule
Fabian Hueske created FLINK-3934: Summary: Prevent translation of non-equi joins in DataSetJoinRule Key: FLINK-3934 URL: https://issues.apache.org/jira/browse/FLINK-3934 Project: Flink Issue Type: Bug Components: Table API Affects Versions: 1.1.0 Reporter: Fabian Hueske Assignee: Fabian Hueske At the moment, also non-equi joins are translated into {{DataSetJoin}}s. To prevent such plans from being picked, we assign huge costs and eventually fail their translation into DataSet programs. A better solution is to prevent a non-equi join from being translated into a {{DataSetJoin}} in the {{DataSetJoinRule}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15290909#comment-15290909 ] ASF GitHub Bot commented on FLINK-3761: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/1988#issuecomment-220293946 Yeah, we were also wondering wether it would make sense to allow the state itself to be repartitioned, i.e. union and then split into the new parallelism. In this way we wouldn't read all state in every operator. > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: state backends >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)