[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309297#comment-15309297 ] ASF GitHub Bot commented on FLINK-3763: --- Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65303646 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; +
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user subhankarb commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65303646 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** +* +* @param uri
[jira] [Created] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)
Aride Chettali created FLINK-4000: - Summary: Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure) Key: FLINK-4000 URL: https://issues.apache.org/jira/browse/FLINK-4000 Project: Flink Issue Type: Bug Components: DataStream API Affects Versions: 1.0.3 Environment: //Fault Tolerance Configuration of the Job env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1)); Reporter: Aride Chettali java.lang.Exception: Could not restore checkpointed state to operators and functions at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Failed to restore state to function: null at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449) ... 3 more Caused by: java.lang.NullPointerException at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184) at org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165) ... 4 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309058#comment-15309058 ] ASF GitHub Bot commented on FLINK-1979: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1985 Hi @skavulya, I just quickly reviewed your updated PR and left few comments. They are not critical things but It would be better to fix them. Other things are very good. > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/1985 Hi @skavulya, I just quickly reviewed your updated PR and left few comments. They are not critical things but It would be better to fix them. Other things are very good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309056#comment-15309056 ] ASF GitHub Bot commented on FLINK-1979: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291933 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationPenaltyTest.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + + +class RegularizationPenaltyTest extends FlatSpec with Matchers with FlinkTestBase { --- End diff -- Same as above, we don't need to extend `FlinkTestBase`. > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291933 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationPenaltyTest.scala --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.math.DenseVector +import org.apache.flink.test.util.FlinkTestBase +import org.scalatest.{FlatSpec, Matchers} + + +class RegularizationPenaltyTest extends FlatSpec with Matchers with FlinkTestBase { --- End diff -- Same as above, we don't need to extend `FlinkTestBase`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309054#comment-15309054 ] ASF GitHub Bot commented on FLINK-1979: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291911 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionTest.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.common.{LabeledVector, WeightVector} +import org.apache.flink.ml.math.DenseVector +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.test.util.FlinkTestBase + + +class LossFunctionTest extends FlatSpec with Matchers with FlinkTestBase { --- End diff -- We don't need to extend `FlinkTestBase` because `LossFunctionTest` do not use Flink cluster. > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291911 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionTest.scala --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.common.{LabeledVector, WeightVector} +import org.apache.flink.ml.math.DenseVector +import org.scalatest.{Matchers, FlatSpec} +import org.apache.flink.test.util.FlinkTestBase + + +class LossFunctionTest extends FlatSpec with Matchers with FlinkTestBase { --- End diff -- We don't need to extend `FlinkTestBase` because `LossFunctionTest` do not use Flink cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291552 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/RegularizationPenalty.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.math.{Vector, BLAS} +import org.apache.flink.ml.math.Breeze._ +import breeze.linalg.{norm => BreezeNorm} + +/** Represents a type of regularization penalty + * + * Regularization penalties are used to restrict the optimization problem to solutions with + * certain desirable characteristics, such as sparsity for the L1 penalty, or penalizing large + * weights for the L2 penalty. + * + * The regularization term, `R(w)` is added to the objective function, `f(w) = L(w) + lambda*R(w)` + * where lambda is the regularization parameter used to tune the amount of regularization applied. + */ +trait RegularizationPenalty extends Serializable { + + /** Calculates the new weights based on the gradient and regularization penalty +* +* @param weightVector The weights to be updated +* @param gradient The gradient used to update the weights +* @param regularizationConstant The regularization parameter to be applied +* @param learningRate The effective step size for this iteration +* @return Updated weights +*/ + def takeStep( + weightVector: Vector, + gradient: Vector, + regularizationConstant: Double, + learningRate: Double) +: Vector + + /** Adds regularization to the loss value +* +* @param oldLoss The loss to be updated +* @param weightVector The gradient used to update the loss +* @param regularizationConstant The regularization parameter to be applied +* @return Updated loss +*/ + def regLoss(oldLoss: Double, weightVector: Vector, regularizationConstant: Double): Double + +} + + +/** `L_2` regularization penalty. + * + * The regularization function is the square of the L2 norm `1/2*||w||_2^2` + * with `w` being the weight vector. The function penalizes large weights, + * favoring solutions with more small weights rather than few large ones. + */ +object L2Regularization extends RegularizationPenalty { + + /** Calculates the new weights based on the gradient and L2 regularization penalty +* +* The updated weight is `w - learningRate *(gradient + lambda * w)` where +* `w` is the weight vector, and `lambda` is the regularization parameter. +* +* @param weightVector The weights to be updated +* @param gradient The gradient according to which we will update the weights +* @param regularizationConstant The regularization parameter to be applied +* @param learningRate The effective step size for this iteration +* @return Updated weights +*/ + override def takeStep( + weightVector: Vector, + gradient: Vector, + regularizationConstant: Double, + learningRate: Double) +: Vector = { +// add the gradient of the L2 regularization +BLAS.axpy(regularizationConstant, weightVector, gradient) + +// update the weights according to the learning rate +BLAS.axpy(-learningRate, gradient, weightVector) + +weightVector + } + + /** Adds regularization to the loss value +* +* The updated loss is `oldLoss + lambda * 1/2*||w||_2^2` where +* `w` is the weight vector, and `lambda` is the regularization parameter +* +* @param oldLoss The loss to be updated +* @param weightVector The gradient used to update the loss +* @param regularizationConstant The regularization parameter to be applied +* @return Updated loss +*/ + override def
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309044#comment-15309044 ] ASF GitHub Bot commented on FLINK-1979: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291552 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/RegularizationPenalty.scala --- @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.optimization + +import org.apache.flink.ml.math.{Vector, BLAS} +import org.apache.flink.ml.math.Breeze._ +import breeze.linalg.{norm => BreezeNorm} + +/** Represents a type of regularization penalty + * + * Regularization penalties are used to restrict the optimization problem to solutions with + * certain desirable characteristics, such as sparsity for the L1 penalty, or penalizing large + * weights for the L2 penalty. + * + * The regularization term, `R(w)` is added to the objective function, `f(w) = L(w) + lambda*R(w)` + * where lambda is the regularization parameter used to tune the amount of regularization applied. + */ +trait RegularizationPenalty extends Serializable { + + /** Calculates the new weights based on the gradient and regularization penalty +* +* @param weightVector The weights to be updated +* @param gradient The gradient used to update the weights +* @param regularizationConstant The regularization parameter to be applied +* @param learningRate The effective step size for this iteration +* @return Updated weights +*/ + def takeStep( + weightVector: Vector, + gradient: Vector, + regularizationConstant: Double, + learningRate: Double) +: Vector + + /** Adds regularization to the loss value +* +* @param oldLoss The loss to be updated +* @param weightVector The gradient used to update the loss +* @param regularizationConstant The regularization parameter to be applied +* @return Updated loss +*/ + def regLoss(oldLoss: Double, weightVector: Vector, regularizationConstant: Double): Double + +} + + +/** `L_2` regularization penalty. + * + * The regularization function is the square of the L2 norm `1/2*||w||_2^2` + * with `w` being the weight vector. The function penalizes large weights, + * favoring solutions with more small weights rather than few large ones. + */ +object L2Regularization extends RegularizationPenalty { + + /** Calculates the new weights based on the gradient and L2 regularization penalty +* +* The updated weight is `w - learningRate *(gradient + lambda * w)` where +* `w` is the weight vector, and `lambda` is the regularization parameter. +* +* @param weightVector The weights to be updated +* @param gradient The gradient according to which we will update the weights +* @param regularizationConstant The regularization parameter to be applied +* @param learningRate The effective step size for this iteration +* @return Updated weights +*/ + override def takeStep( + weightVector: Vector, + gradient: Vector, + regularizationConstant: Double, + learningRate: Double) +: Vector = { +// add the gradient of the L2 regularization +BLAS.axpy(regularizationConstant, weightVector, gradient) + +// update the weights according to the learning rate +BLAS.axpy(-learningRate, gradient, weightVector) + +weightVector + } + + /** Adds regularization to the loss value +* +* The updated loss is `oldLoss + lambda * 1/2*||w||_2^2` where +* `w` is the weight vector, and `lambda` is the regularization parameter +* +* @param oldLoss The loss to
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309043#comment-15309043 ] ASF GitHub Bot commented on FLINK-1979: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291461 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala --- @@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction { /** Calculates the loss depending on the label and the prediction * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The loss */ override def loss(prediction: Double, label: Double): Double = { 0.5 * (prediction - label) * (prediction - label) } /** Calculates the derivative of the [[PartialLossFunction]] * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function */ override def derivative(prediction: Double, label: Double): Double = { (prediction - label) } } + +/** Logistic loss function which can be used with the [[GenericLossFunction]] + * + * + * The [[LogisticLoss]] function implements `log(1 + -exp(prediction*label))` + * for binary classification with label in {-1, 1} + */ +object LogisticLoss extends PartialLossFunction { + + /** Calculates the loss depending on the label and the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The loss +*/ + override def loss(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return math.exp(-z) +} +else if (z < -18) { + return -z +} + +math.log(1 + math.exp(-z)) + } + + /** Calculates the derivative of the loss function with respect to the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function +*/ + override def derivative(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return -label * math.exp(-z) +} +else if (z < -18) { + return -label +} + +-label/(math.exp(z) + 1) + } --- End diff -- As I said above, following is better: ```scala if (z > 18) { -label * math.exp(-z) } else if (z < -18) { -label } else { -label / (math.exp(z) + 1) } ``` > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291461 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala --- @@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction { /** Calculates the loss depending on the label and the prediction * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The loss */ override def loss(prediction: Double, label: Double): Double = { 0.5 * (prediction - label) * (prediction - label) } /** Calculates the derivative of the [[PartialLossFunction]] * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function */ override def derivative(prediction: Double, label: Double): Double = { (prediction - label) } } + +/** Logistic loss function which can be used with the [[GenericLossFunction]] + * + * + * The [[LogisticLoss]] function implements `log(1 + -exp(prediction*label))` + * for binary classification with label in {-1, 1} + */ +object LogisticLoss extends PartialLossFunction { + + /** Calculates the loss depending on the label and the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The loss +*/ + override def loss(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return math.exp(-z) +} +else if (z < -18) { + return -z +} + +math.log(1 + math.exp(-z)) + } + + /** Calculates the derivative of the loss function with respect to the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function +*/ + override def derivative(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return -label * math.exp(-z) +} +else if (z < -18) { + return -label +} + +-label/(math.exp(z) + 1) + } --- End diff -- As I said above, following is better: ```scala if (z > 18) { -label * math.exp(-z) } else if (z < -18) { -label } else { -label / (math.exp(z) + 1) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291353 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala --- @@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction { /** Calculates the loss depending on the label and the prediction * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The loss */ override def loss(prediction: Double, label: Double): Double = { 0.5 * (prediction - label) * (prediction - label) } /** Calculates the derivative of the [[PartialLossFunction]] * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function */ override def derivative(prediction: Double, label: Double): Double = { (prediction - label) } } + +/** Logistic loss function which can be used with the [[GenericLossFunction]] + * + * + * The [[LogisticLoss]] function implements `log(1 + -exp(prediction*label))` + * for binary classification with label in {-1, 1} + */ +object LogisticLoss extends PartialLossFunction { + + /** Calculates the loss depending on the label and the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The loss +*/ + override def loss(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return math.exp(-z) +} +else if (z < -18) { + return -z +} + +math.log(1 + math.exp(-z)) --- End diff -- Using `return` is not recommended in Scala. Could you change this like following? ```scala if (z > 18) { math.exp(-z) } else if (z < -18) { -z } else { math.log(1 + math.exp(-z)) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309038#comment-15309038 ] ASF GitHub Bot commented on FLINK-1979: --- Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/1985#discussion_r65291353 --- Diff: flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala --- @@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction { /** Calculates the loss depending on the label and the prediction * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The loss */ override def loss(prediction: Double, label: Double): Double = { 0.5 * (prediction - label) * (prediction - label) } /** Calculates the derivative of the [[PartialLossFunction]] * -* @param prediction -* @param label -* @return +* @param prediction The predicted value +* @param label The true value +* @return The derivative of the loss function */ override def derivative(prediction: Double, label: Double): Double = { (prediction - label) } } + +/** Logistic loss function which can be used with the [[GenericLossFunction]] + * + * + * The [[LogisticLoss]] function implements `log(1 + -exp(prediction*label))` + * for binary classification with label in {-1, 1} + */ +object LogisticLoss extends PartialLossFunction { + + /** Calculates the loss depending on the label and the prediction +* +* @param prediction The predicted value +* @param label The true value +* @return The loss +*/ + override def loss(prediction: Double, label: Double): Double = { +val z = prediction * label + +// based on implementation in scikit-learn +// approximately equal and saves the computation of the log +if (z > 18) { + return math.exp(-z) +} +else if (z < -18) { + return -z +} + +math.log(1 + math.exp(-z)) --- End diff -- Using `return` is not recommended in Scala. Could you change this like following? ```scala if (z > 18) { math.exp(-z) } else if (z < -18) { -z } else { math.log(1 + math.exp(-z)) } ``` > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3994) Instable KNNITSuite
[ https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chiwan Park closed FLINK-3994. -- Resolution: Fixed Fixed via aedc0fd481a756ef2b1708f896d5500475715232. Thanks for guidance [~StephanEwen] and [~mxm]! :-) > Instable KNNITSuite > --- > > Key: FLINK-3994 > URL: https://issues.apache.org/jira/browse/FLINK-3994 > Project: Flink > Issue Type: Bug > Components: Machine Learning Library, Tests >Affects Versions: 1.1.0 >Reporter: Chiwan Park >Assignee: Chiwan Park >Priority: Critical > Labels: test-stability > Fix For: 1.1.0 > > > KNNITSuite fails in Travis-CI with following error: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > ... > Cause: java.io.IOException: Insufficient number of network buffers: > required 32, but only 4 available. The total number of network buffers is > currently set to 2048. You can increase this number by setting the > configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497) > at java.lang.Thread.run(Thread.java:745) > ... > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2056 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3994) Instable KNNITSuite
[ https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309020#comment-15309020 ] ASF GitHub Bot commented on FLINK-3994: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2056 > Instable KNNITSuite > --- > > Key: FLINK-3994 > URL: https://issues.apache.org/jira/browse/FLINK-3994 > Project: Flink > Issue Type: Bug > Components: Machine Learning Library, Tests >Affects Versions: 1.1.0 >Reporter: Chiwan Park >Assignee: Chiwan Park >Priority: Critical > Labels: test-stability > Fix For: 1.1.0 > > > KNNITSuite fails in Travis-CI with following error: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > ... > Cause: java.io.IOException: Insufficient number of network buffers: > required 32, but only 4 available. The total number of network buffers is > currently set to 2048. You can increase this number by setting the > configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497) > at java.lang.Thread.run(Thread.java:745) > ... > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2056 Merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3994) Instable KNNITSuite
[ https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309017#comment-15309017 ] ASF GitHub Bot commented on FLINK-3994: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2056 Merging... > Instable KNNITSuite > --- > > Key: FLINK-3994 > URL: https://issues.apache.org/jira/browse/FLINK-3994 > Project: Flink > Issue Type: Bug > Components: Machine Learning Library, Tests >Affects Versions: 1.1.0 >Reporter: Chiwan Park >Assignee: Chiwan Park >Priority: Critical > Labels: test-stability > Fix For: 1.1.0 > > > KNNITSuite fails in Travis-CI with following error: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > ... > Cause: java.io.IOException: Insufficient number of network buffers: > required 32, but only 4 available. The total number of network buffers is > currently set to 2048. You can increase this number by setting the > configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497) > at java.lang.Thread.run(Thread.java:745) > ... > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers
[ https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308581#comment-15308581 ] ASF GitHub Bot commented on FLINK-3758: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1979 How about tagging the new method with `@PublicEvolving`. Since its a new API anyways, its good not to make it part of the stable interfaces. With Flink 2.0, we can remove the `Evolving` part ;) > Add possibility to register accumulators in custom triggers > --- > > Key: FLINK-3758 > URL: https://issues.apache.org/jira/browse/FLINK-3758 > Project: Flink > Issue Type: Improvement >Reporter: Konstantin Knauf >Assignee: Konstantin Knauf >Priority: Minor > > For monitoring purposes it would be nice to be able to to use accumulators in > custom trigger functions. > Basically, the trigger context could just expose {{getAccumulator}} of > {{RuntimeContext}} or does this create problems I am not aware of? > Adding accumulators in a trigger function is more difficult, I think, but > that's not really neccessary as the accummulator could just be added in some > other upstream operator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3758] Add possibility to register accumulators in...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1979 How about tagging the new method with `@PublicEvolving`. Since its a new API anyways, its good not to make it part of the stable interfaces. With Flink 2.0, we can remove the `Evolving` part ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3054) Remove R (return) type variable from SerializationSchema
[ https://issues.apache.org/jira/browse/FLINK-3054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308531#comment-15308531 ] ASF GitHub Bot commented on FLINK-3054: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1406 I think you can use this class as a reference: https://github.com/apache/flink/blob/2c507f93300ad1b53c477e0939f1f2232187b37f/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java > Remove R (return) type variable from SerializationSchema > > > Key: FLINK-3054 > URL: https://issues.apache.org/jira/browse/FLINK-3054 > Project: Flink > Issue Type: Bug > Components: Streaming >Reporter: Robert Metzger >Assignee: Robert Metzger > Fix For: 1.0.0 > > > For the upcoming 1.0 release, I would like to remove the R type parameter > from the {{SerializationSchema}} class. > The {{DeserializationSchema}} doesn't have a variable input type either and I > think its okay to serialize and deserialize to byte[] all the time through > these interfaces. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3054] Remove R (return) type variable from Serial...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/1406 I think you can use this class as a reference: https://github.com/apache/flink/blob/2c507f93300ad1b53c477e0939f1f2232187b37f/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308525#comment-15308525 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2054 Thank you for your contribution. There are still some open issues that need addressing before we can merge this. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/2054 Thank you for your contribution. There are still some open issues that need addressing before we can merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308520#comment-15308520 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253337 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -35,24 +36,28 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); private String QUEUE_NAME; - private String HOST_NAME; + private RMQConnectionConfig rmqConnectionConfig; private transient ConnectionFactory factory; private transient Connection connection; private transient Channel channel; private SerializationSchema schema; - public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema schema) { - this.HOST_NAME = HOST_NAME; + /** +* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. +* @param QUEUE_NAME The queue to publish messages to. +* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + */ + public RMQSink(RMQConnectionConfig rmqConnectionConfig, String QUEUE_NAME, SerializationSchema schema) { + this.rmqConnectionConfig = rmqConnectionConfig; this.QUEUE_NAME = QUEUE_NAME; this.schema = schema; } /** * Initializes the connection to RMQ. */ - public void initializeConnection() { - factory = new ConnectionFactory(); - factory.setHost(HOST_NAME); + public void initializeConnection() throws Exception { + factory = rmqConnectionConfig.getConnectionFactory(); --- End diff -- Another comment which is something that has been like this before: Could you remove the initializeConnection() method and move all its content into the open() method. That reduces unnecessary boilerplate. Thank you ;) > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253337 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -35,24 +36,28 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); private String QUEUE_NAME; - private String HOST_NAME; + private RMQConnectionConfig rmqConnectionConfig; private transient ConnectionFactory factory; private transient Connection connection; private transient Channel channel; private SerializationSchema schema; - public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema schema) { - this.HOST_NAME = HOST_NAME; + /** +* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. +* @param QUEUE_NAME The queue to publish messages to. +* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + */ + public RMQSink(RMQConnectionConfig rmqConnectionConfig, String QUEUE_NAME, SerializationSchema schema) { + this.rmqConnectionConfig = rmqConnectionConfig; this.QUEUE_NAME = QUEUE_NAME; this.schema = schema; } /** * Initializes the connection to RMQ. */ - public void initializeConnection() { - factory = new ConnectionFactory(); - factory.setHost(HOST_NAME); + public void initializeConnection() throws Exception { + factory = rmqConnectionConfig.getConnectionFactory(); --- End diff -- Another comment which is something that has been like this before: Could you remove the initializeConnection() method and move all its content into the open() method. That reduces unnecessary boilerplate. Thank you ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253122 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -35,24 +36,28 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); private String QUEUE_NAME; --- End diff -- I know its not part of the issue's scope, but could you rename this field to `queueName`. Otherwise, the class has a very poor style which makes it hard to read it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308514#comment-15308514 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253122 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -35,24 +36,28 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); private String QUEUE_NAME; --- End diff -- I know its not part of the issue's scope, but could you rename this field to `queueName`. Otherwise, the class has a very poor style which makes it hard to read it. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308516#comment-15308516 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253162 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -35,24 +36,28 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); private String QUEUE_NAME; - private String HOST_NAME; + private RMQConnectionConfig rmqConnectionConfig; private transient ConnectionFactory factory; private transient Connection connection; private transient Channel channel; private SerializationSchema schema; - public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema schema) { - this.HOST_NAME = HOST_NAME; + /** +* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. +* @param QUEUE_NAME The queue to publish messages to. +* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + */ + public RMQSink(RMQConnectionConfig rmqConnectionConfig, String QUEUE_NAME, SerializationSchema schema) { --- End diff -- Same here with the QUEUE_NAME > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253162 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java --- @@ -35,24 +36,28 @@ private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class); private String QUEUE_NAME; - private String HOST_NAME; + private RMQConnectionConfig rmqConnectionConfig; private transient ConnectionFactory factory; private transient Connection connection; private transient Channel channel; private SerializationSchema schema; - public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema schema) { - this.HOST_NAME = HOST_NAME; + /** +* @param rmqConnectionConfig The RabbiMQ connection configuration {@link RMQConnectionConfig}. +* @param QUEUE_NAME The queue to publish messages to. +* @param schema A {@link SerializationSchema} for turning the Java objects received into bytes + */ + public RMQSink(RMQConnectionConfig rmqConnectionConfig, String QUEUE_NAME, SerializationSchema schema) { --- End diff -- Same here with the QUEUE_NAME --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308512#comment-15308512 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253016 --- Diff: docs/apis/streaming/connectors/rabbitmq.md --- @@ -71,23 +71,25 @@ Example: {% highlight java %} +RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().build(); --- End diff -- As I've written below, I would prefer if users would not rely on the fact that "localhost" is the default value here. Once the builder has been changed, we would expect the user to call `.setHost("localhost")` here. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65253016 --- Diff: docs/apis/streaming/connectors/rabbitmq.md --- @@ -71,23 +71,25 @@ Example: {% highlight java %} +RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().build(); --- End diff -- As I've written below, I would prefer if users would not rely on the fact that "localhost" is the default value here. Once the builder has been changed, we would expect the user to call `.setHost("localhost")` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308506#comment-15308506 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65252614 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; +
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65252614 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** +* +* @param uri
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65252489 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** +* +* @param uri
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308501#comment-15308501 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65252489 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; +
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308485#comment-15308485 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65251121 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; +
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65251121 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** +* +* @param uri
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308479#comment-15308479 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65250839 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; +
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65250839 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; +import com.rabbitmq.client.ConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Connection Configuration for RMQ. + * If {@link Builder#setUri(String)} has been set then {@link RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection or + * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, String, String, int, boolean, boolean, int, int, int, int)} + * will be used for initialize the RMQ connection + */ +public class RMQConnectionConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(RMQConnectionConfig.class); + + private String host; + private int port; + private String virtualHost; + private String username; + private String password; + private String uri; + + private int networkRecoveryInterval; + private boolean automaticRecovery; + private boolean topologyRecovery; + + private int connectionTimeout; + private int requestedChannelMax; + private int requestedFrameMax; + private int requestedHeartbeat; + + /** +* +* @param host host name +* @param port port +* @param virtualHost virtual host +* @param username username +* @param password password + +* @param networkRecoveryInterval connection recovery interval in milliseconds +* @param automaticRecovery if automatic connection recovery +* @param topologyRecovery if topology recovery +* @param connectionTimeout connection timeout +* @param requestedChannelMax requested maximum channel number +* @param requestedFrameMax requested maximum frame size +* @param requestedHeartbeat requested heartbeat interval +* @throws NullPointerException if host or virtual host or username or password is null + */ + private RMQConnectionConfig(String host, int port, String virtualHost, String username, String password, + int networkRecoveryInterval, boolean automaticRecovery, + boolean topologyRecovery, int connectionTimeout, int requestedChannelMax, int requestedFrameMax, + int requestedHeartbeat){ + Preconditions.checkNotNull(host, "host can not be null"); + Preconditions.checkNotNull(virtualHost, "virtualHost can not be null"); + Preconditions.checkNotNull(username, "username can not be null"); + Preconditions.checkNotNull(password, "password can not be null"); + this.host = host; + this.port = port; + this.virtualHost = virtualHost; + this.username = username; + this.password = password; + + this.networkRecoveryInterval = networkRecoveryInterval; + this.automaticRecovery = automaticRecovery; + this.topologyRecovery = topologyRecovery; + this.connectionTimeout = connectionTimeout; + this.requestedChannelMax = requestedChannelMax; + this.requestedFrameMax = requestedFrameMax; + this.requestedHeartbeat = requestedHeartbeat; + } + + /** +* +* @param uri
[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters
[ https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308478#comment-15308478 ] ASF GitHub Bot commented on FLINK-3763: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65250708 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; --- End diff -- Could you use Flink's own `Preconditions` class in org.apache.flink.util? We are trying to get rid of guava in the long run. > RabbitMQ Source/Sink standardize connection parameters > -- > > Key: FLINK-3763 > URL: https://issues.apache.org/jira/browse/FLINK-3763 > Project: Flink > Issue Type: Improvement > Components: Streaming Connectors >Affects Versions: 1.0.1 >Reporter: Robert Batts >Assignee: Subhankar Biswas > > The RabbitMQ source and sink should have the same capabilities in terms of > establishing a connection, currently the sink is lacking connection > parameters that are available on the source. Additionally, VirtualHost should > be an offered parameter for multi-tenant RabbitMQ clusters (if not specified > it goes to the vhost '/'). > Connection Parameters > === > - Host - Offered on both > - Port - Source only > - Virtual Host - Neither > - User - Source only > - Password - Source only > Additionally, it might be worth offer the URI as a valid constructor because > that would offer all 5 of the above parameters in a single String. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/2054#discussion_r65250708 --- Diff: flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java --- @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq.common; + +import com.google.common.base.Preconditions; --- End diff -- Could you use Flink's own `Preconditions` class in org.apache.flink.util? We are trying to get rid of guava in the long run. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308246#comment-15308246 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65233230 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65233230 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308233#comment-15308233 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231707 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231707 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308232#comment-15308232 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231642 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231642 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java --- @@ -0,0 +1,1048 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeutils.SameTypePairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.RandomAccessInputView; +import org.apache.flink.runtime.memory.AbstractPagedOutputView; +import org.apache.flink.util.MathUtils; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * This hash table supports updating elements, and it also has processRecordWithReduce, + * which makes one reduce step with the given record. + * + * The memory is divided into three areas: + * - Bucket area: they contain bucket heads: + *an 8 byte pointer to the first link of a linked list in the record area + * - Record area: this contains the actual data in linked list elements. A linked list element starts + *with an 8 byte pointer to the next element, and then the record follows. + * - Staging area: This is a small, temporary storage area for writing updated records. This is needed, + *because before serializing a record, there is no way to know in advance how large will it be. + *Therefore, we can't serialize directly into the record area when we are doing an update, because + *if it turns out to be larger then the old record, then it would override some other record + *that happens to be after the old one in memory. The solution is to serialize to the staging area first, + *and then copy it to the place of the original if it has the same size, otherwise allocate a new linked + *list element at the end of the record area, and mark the old one as abandoned. This creates "holes" in + *the record area, so compactions are eventually needed. + * + * Compaction happens by deleting everything in the bucket area, and then reinserting all elements. + * The reinsertion happens by forgetting the structure (the linked lists) of the record area, and reading it + * sequentially, and inserting all non-abandoned records, starting from the beginning of the record area. + * Note, that insertions never override a record that have not been read by the reinsertion sweep, because + * both the insertions and readings happen sequentially in the record area, and the insertions obviously + * never overtake the reading sweep. + * + * Note: we have to abandon the old linked list element even when the updated record has a smaller size + * than the original, because otherwise we wouldn't know where the next record starts during a reinsertion + * sweep. + * + * The number of buckets depends on how large are the records. The serializer might be able to tell us this, + * so in this case, we will calculate the number of buckets upfront, and won't do resizes. + * If the serializer doesn't know the size, then we start with a small number of buckets, and do resizes as more + * elements are inserted than the number of buckets. + * + * The number of memory segments given to the staging area is usually one, because it just needs to hold + * one record. + *
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231516 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -114,85 +118,133 @@ public void prepare() throws Exception { MemoryManager memManager = this.taskContext.getMemoryManager(); final int numMemoryPages = memManager.computeNumberOfPages( - this.taskContext.getTaskConfig().getRelativeMemoryDriver()); + this.taskContext.getTaskConfig().getRelativeMemoryDriver()); this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages); - // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter - if (this.comparator.supportsSerializationWithKeyNormalization() && - this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) - { - this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, memory); - } else { - this.sorter = new NormalizedKeySorter(this.serializer, this.comparator.duplicate(), memory); - } - ExecutionConfig executionConfig = taskContext.getExecutionConfig(); this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); if (LOG.isDebugEnabled()) { LOG.debug("ReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); } + + switch (strategy) { + case SORTED_PARTIAL_REDUCE: + // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter + if (this.comparator.supportsSerializationWithKeyNormalization() && + this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) { + this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, memory); --- End diff -- 336490ccbcfc3b1a2be93eddd9cb531532a19a9e --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231501 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; + + // + + @Override + public Function getStub() { + return reducer; + } + + @Override + public String getTaskName() { + return taskName; + } + + @Override + public void setup(AbstractInvokable parent) { + this.parent = parent; + canceled = false; + + strategy = config.getDriverStrategy(); + + reducer = BatchTask.instantiateUserCode(config, userCodeClassLoader, ReduceFunction.class); + FunctionUtils.setFunctionRuntimeContext(reducer, getUdfRuntimeContext()); + } + + @Override + public void openTask() throws Exception { + // open the stub first + final Configuration stubConfig = config.getStubParameters(); + BatchTask.openUserCode(reducer, stubConfig); + + // instantiate the serializer / comparator + serializer = config.getInputSerializer(0, userCodeClassLoader).getSerializer(); + comparator = config.getDriverComparator(0, userCodeClassLoader).createComparator(); + + MemoryManager memManager = parent.getEnvironment().getMemoryManager(); + final int numMemoryPages = memManager.computeNumberOfPages(config.getRelativeMemoryDriver()); + memory = memManager.allocatePages(parent, numMemoryPages);
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308227#comment-15308227 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231386 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- > Can we keep the name of the flag as it is for now? All other drivers use a running flag as well. I would rather open a separate JIRA to fix the name in all drivers. 12e36ab93b7e7d94d497a6d718eba21ead813d7e And here is the Jira: https://issues.apache.org/jira/browse/FLINK-3999 > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308229#comment-15308229 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231501 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java --- @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.chaining; + +import org.apache.flink.api.common.functions.Function; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.util.FunctionUtils; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.operators.DriverStrategy; +import org.apache.flink.runtime.operators.hash.ReduceHashTable; +import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter; +import org.apache.flink.runtime.operators.sort.InMemorySorter; +import org.apache.flink.runtime.operators.sort.NormalizedKeySorter; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.flink.util.Collector; +import org.apache.flink.util.MutableObjectIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.util.List; + +/** + * Chained version of ReduceCombineDriver. + */ +public class ChainedReduceCombineDriver extends ChainedDriver{ + + private static final Logger LOG = LoggerFactory.getLogger(ChainedReduceCombineDriver.class); + + /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ + private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; + + + private AbstractInvokable parent; + + private TypeSerializer serializer; + + private TypeComparator comparator; + + private ReduceFunction reducer; + + private DriverStrategy strategy; + + private InMemorySorter sorter; + + private QuickSort sortAlgo = new QuickSort(); + + private ReduceHashTable table; + + private List memory; + + private volatile boolean canceled; + + // + + @Override + public Function getStub() { + return reducer; + } + + @Override + public String getTaskName() { + return taskName; + } + + @Override + public void setup(AbstractInvokable parent) { + this.parent = parent; + canceled = false; + + strategy = config.getDriverStrategy(); + + reducer = BatchTask.instantiateUserCode(config, userCodeClassLoader, ReduceFunction.class); + FunctionUtils.setFunctionRuntimeContext(reducer, getUdfRuntimeContext()); + } + + @Override + public void openTask() throws Exception { + // open the stub first + final Configuration stubConfig = config.getStubParameters(); + BatchTask.openUserCode(reducer, stubConfig); + + // instantiate the serializer / comparator + serializer = config.getInputSerializer(0, userCodeClassLoader).getSerializer(); + comparator = config.getDriverComparator(0, userCodeClassLoader).createComparator(); + + MemoryManager
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308230#comment-15308230 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231516 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -114,85 +118,133 @@ public void prepare() throws Exception { MemoryManager memManager = this.taskContext.getMemoryManager(); final int numMemoryPages = memManager.computeNumberOfPages( - this.taskContext.getTaskConfig().getRelativeMemoryDriver()); + this.taskContext.getTaskConfig().getRelativeMemoryDriver()); this.memory = memManager.allocatePages(this.taskContext.getOwningNepheleTask(), numMemoryPages); - // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter - if (this.comparator.supportsSerializationWithKeyNormalization() && - this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) - { - this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, memory); - } else { - this.sorter = new NormalizedKeySorter(this.serializer, this.comparator.duplicate(), memory); - } - ExecutionConfig executionConfig = taskContext.getExecutionConfig(); this.objectReuseEnabled = executionConfig.isObjectReuseEnabled(); if (LOG.isDebugEnabled()) { LOG.debug("ReduceCombineDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + "."); } + + switch (strategy) { + case SORTED_PARTIAL_REDUCE: + // instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter + if (this.comparator.supportsSerializationWithKeyNormalization() && + this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) { + this.sorter = new FixedLengthRecordSorter(this.serializer, this.comparator, memory); --- End diff -- 336490ccbcfc3b1a2be93eddd9cb531532a19a9e > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65231386 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- > Can we keep the name of the flag as it is for now? All other drivers use a running flag as well. I would rather open a separate JIRA to fix the name in all drivers. 12e36ab93b7e7d94d497a6d718eba21ead813d7e And here is the Jira: https://issues.apache.org/jira/browse/FLINK-3999 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`
Gabor Gevay created FLINK-3999: -- Summary: Rename the `running` flag in the drivers to `canceled` Key: FLINK-3999 URL: https://issues.apache.org/jira/browse/FLINK-3999 Project: Flink Issue Type: Bug Components: Local Runtime Reporter: Gabor Gevay Priority: Trivial The name of the {{running}} flag in the drivers doesn't reflect its usage: when the operator just stops normally, then it is not running anymore, but the {{running}} flag will still be true, since the {{running}} flag is only set when cancelling. It should be renamed, and the value inverted. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1979) Implement Loss Functions
[ https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308199#comment-15308199 ] ASF GitHub Bot commented on FLINK-1979: --- Github user skavulya commented on the pull request: https://github.com/apache/flink/pull/1985 @tillrohrmann @thvasilo I made the changes you recommended. Please let me know if they look ok. > Implement Loss Functions > > > Key: FLINK-1979 > URL: https://issues.apache.org/jira/browse/FLINK-1979 > Project: Flink > Issue Type: Improvement > Components: Machine Learning Library >Reporter: Johannes Günther >Assignee: Johannes Günther >Priority: Minor > Labels: ML > > For convex optimization problems, optimizer methods like SGD rely on a > pluggable implementation of a loss function and its first derivative. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...
Github user skavulya commented on the pull request: https://github.com/apache/flink/pull/1985 @tillrohrmann @thvasilo I made the changes you recommended. Please let me know if they look ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308205#comment-15308205 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65229499 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- Oops, it's `volatile` in the other drivers as well, sorry! So then it can be just volatile here as well, right? > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65229499 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- Oops, it's `volatile` in the other drivers as well, sorry! So then it can be just volatile here as well, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308191#comment-15308191 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65228213 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) { * @see DataSet */ public ReduceOperator reduce(ReduceFunction reducer) { + return reduce(Utils.getCallLocationName(), reducer, CombineHint.OPTIMIZER_CHOOSES); + } + + /** +* Applies a Reduce transformation on a grouped {@link DataSet}. +* For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} +* until only a single element for each group remains. +* A ReduceFunction combines two elements into one new element of the same type. +* +* @param reducer The ReduceFunction that is applied on each group of the DataSet. +* @param strategy The strategy that should be used to execute the combine phase of the reduce. +* If {@code null} is given, then the optimizer will pick the strategy. +* @return A ReduceOperator that represents the reduced DataSet. +* +* @see org.apache.flink.api.common.functions.RichReduceFunction +* @see ReduceOperator +* @see DataSet +*/ + public ReduceOperator reduce(ReduceFunction reducer, CombineHint strategy) { --- End diff -- e7851693dd261efb6eb1fcde12b04fcf384bd0d4 > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308193#comment-15308193 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65228287 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala --- @@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag]( } /** - * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) - * using an associative reduce function. - */ +* Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) +* using an associative reduce function. +*/ def reduce(fun: (T, T) => T): DataSet[T] = { +reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES) + } + + /** + * Special [[reduce]] operation for explicitly telling the system what strategy to use for the + * combine phase. + * If null is given as the strategy, then the optimizer will pick the strategy. + */ + def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = { --- End diff -- b01fd497fe5e2cf85b71fa8a7b4ec60b5e47d1c5 > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65228287 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala --- @@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag]( } /** - * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) - * using an associative reduce function. - */ +* Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) +* using an associative reduce function. +*/ def reduce(fun: (T, T) => T): DataSet[T] = { +reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES) + } + + /** + * Special [[reduce]] operation for explicitly telling the system what strategy to use for the + * combine phase. + * If null is given as the strategy, then the optimizer will pick the strategy. + */ + def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = { --- End diff -- b01fd497fe5e2cf85b71fa8a7b4ec60b5e47d1c5 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65228213 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java --- @@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) { * @see DataSet */ public ReduceOperator reduce(ReduceFunction reducer) { + return reduce(Utils.getCallLocationName(), reducer, CombineHint.OPTIMIZER_CHOOSES); + } + + /** +* Applies a Reduce transformation on a grouped {@link DataSet}. +* For each group, the transformation consecutively calls a {@link org.apache.flink.api.common.functions.RichReduceFunction} +* until only a single element for each group remains. +* A ReduceFunction combines two elements into one new element of the same type. +* +* @param reducer The ReduceFunction that is applied on each group of the DataSet. +* @param strategy The strategy that should be used to execute the combine phase of the reduce. +* If {@code null} is given, then the optimizer will pick the strategy. +* @return A ReduceOperator that represents the reduced DataSet. +* +* @see org.apache.flink.api.common.functions.RichReduceFunction +* @see ReduceOperator +* @see DataSet +*/ + public ReduceOperator reduce(ReduceFunction reducer, CombineHint strategy) { --- End diff -- e7851693dd261efb6eb1fcde12b04fcf384bd0d4 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly
[ https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308181#comment-15308181 ] ASF GitHub Bot commented on FLINK-3806: --- Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/2036 All comments should now be addressed. > Revert use of DataSet.count() in Gelly > -- > > Key: FLINK-3806 > URL: https://issues.apache.org/jira/browse/FLINK-3806 > Project: Flink > Issue Type: Improvement > Components: Gelly >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Critical > Fix For: 1.1.0 > > > FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The > former returns a {{DataSet}} while the latter executes a job to return a Java > value. > {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and > {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and > {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the > user does not pass the number of vertices as a parameter. > As noted in FLINK-1632, this does make the code simpler but if my > understanding is correct will materialize the Graph twice. The Graph will > need to be reread from input, regenerated, or recomputed by preceding > algorithms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.count()
Github user greghogan commented on the pull request: https://github.com/apache/flink/pull/2036 All comments should now be addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [hotfix] Fix JSONDeserializationSchema for Kafka; Parame...
GitHub user rmetzger opened a pull request: https://github.com/apache/flink/pull/2057 [hotfix] Fix JSONDeserializationSchema for Kafka; ParameterTool usability The JSONDeserializationSchema was implementing the KeyedDeserializationSchema. However, it was not using the key or any other additional metadata. Therefore, I changed the base class. Also, the `ParameterTool`'s exception for reading a `.properties` file has been improved. You can merge this pull request into a Git repository by running: $ git pull https://github.com/rmetzger/flink mini-hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2057.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2057 commit 6f334b81fd17d80a46d62797e918a098406cf6a0 Author: Robert MetzgerDate: 2016-05-31T16:50:35Z [hotfix] Fix JSONDeserializationSchema for Kafka; ParameterTool usability --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65219886 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala --- @@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag]( } /** - * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) - * using an associative reduce function. - */ +* Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) +* using an associative reduce function. +*/ def reduce(fun: (T, T) => T): DataSet[T] = { +reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES) + } + + /** + * Special [[reduce]] operation for explicitly telling the system what strategy to use for the + * combine phase. + * If null is given as the strategy, then the optimizer will pick the strategy. + */ + def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = { --- End diff -- > Would it be OK if I added optional arguments instead of the overloads like this? Oops, this doesn't work. I'm getting ``` Error:(45, 7) in class GroupedDataSet, multiple overloaded alternatives of reduce define default arguments ``` An alternative solution would be to instead of just wrapping in a Scala `DataSet`, create a Scala class that wraps `ReduceOperator`, has the same `setCombineHint` method, and inherits from the Scala `DataSet`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308080#comment-15308080 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65219886 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala --- @@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag]( } /** - * Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) - * using an associative reduce function. - */ +* Creates a new [[DataSet]] by merging the elements of each group (elements with the same key) +* using an associative reduce function. +*/ def reduce(fun: (T, T) => T): DataSet[T] = { +reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES) + } + + /** + * Special [[reduce]] operation for explicitly telling the system what strategy to use for the + * combine phase. + * If null is given as the strategy, then the optimizer will pick the strategy. + */ + def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = { --- End diff -- > Would it be OK if I added optional arguments instead of the overloads like this? Oops, this doesn't work. I'm getting ``` Error:(45, 7) in class GroupedDataSet, multiple overloaded alternatives of reduce define default arguments ``` An alternative solution would be to instead of just wrapping in a Scala `DataSet`, create a Scala class that wraps `ReduceOperator`, has the same `setCombineHint` method, and inherits from the Scala `DataSet`. > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-3998) Access to gauges and counters in StatsDReporter#report() is not properly synchronized
Ted Yu created FLINK-3998: - Summary: Access to gauges and counters in StatsDReporter#report() is not properly synchronized Key: FLINK-3998 URL: https://issues.apache.org/jira/browse/FLINK-3998 Project: Flink Issue Type: Bug Reporter: Ted Yu {code} for (Map.Entryentry : gauges.entrySet()) { reportGauge(entry.getValue(), entry.getKey()); } for (Map.Entry entry : counters.entrySet()) { reportCounter(entry.getValue(), entry.getKey()); {code} Access to gauges and counters should be protected by lock on AbstractReporter.this -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction
[ https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308038#comment-15308038 ] ASF GitHub Bot commented on FLINK-3477: --- Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65216454 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- I have now looked more into this `volatile` stuff, and the problem is that further investigation would be needed to ensure that the variable being `volatile` doesn't have a bad performance effect (see [1]). So maybe the best option for now is to make it non-volatile (as it is in all the other drivers) and just hope that it doesn't blow up. Should I open a Jira for doing further investigation and then maybe changing all the drivers? [1] http://brooker.co.za/blog/2012/09/10/volatile.html > Add hash-based combine strategy for ReduceFunction > -- > > Key: FLINK-3477 > URL: https://issues.apache.org/jira/browse/FLINK-3477 > Project: Flink > Issue Type: Sub-task > Components: Local Runtime >Reporter: Fabian Hueske >Assignee: Gabor Gevay > > This issue is about adding a hash-based combine strategy for ReduceFunctions. > The interface of the {{reduce()}} method is as follows: > {code} > public T reduce(T v1, T v2) > {code} > Input type and output type are identical and the function returns only a > single value. A Reduce function is incrementally applied to compute a final > aggregated value. This allows to hold the preaggregated value in a hash-table > and update it with each function call. > The hash-based strategy requires special implementation of an in-memory hash > table. The hash table should support in place updates of elements (if the > updated value has the same size as the new value) but also appending updates > with invalidation of the old value (if the binary length of the new value > differs). The hash table needs to be able to evict and emit all elements if > it runs out-of-memory. > We should also add {{HASH}} and {{SORT}} compiler hints to > {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the > execution strategy. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308039#comment-15308039 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/2020 The changes look good, we just have to figure out what to do about the methods on `StreamExecutionEnvironment`. One thing I'd like to get rid of is `CheckpointableInputFormat.getCurrentState` returns a Tuple that contains the split. The split is never used internally, it's only stored to return it from this method but the read operator itself also stores the split so I think it is redundant. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/2020 The changes look good, we just have to figure out what to do about the methods on `StreamExecutionEnvironment`. One thing I'd like to get rid of is `CheckpointableInputFormat.getCurrentState` returns a Tuple that contains the split. The split is never used internally, it's only stored to return it from this method but the read operator itself also stores the split so I think it is redundant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...
Github user ggevay commented on a diff in the pull request: https://github.com/apache/flink/pull/1517#discussion_r65216454 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java --- @@ -42,34 +44,38 @@ * Combine operator for Reduce functions, standalone (not chained). * Sorts and groups and reduces data, but never spills the sort. May produce multiple * partially aggregated groups. - * + * * @param The data type consumed and produced by the combiner. */ public class ReduceCombineDriver implements Driver{ - + private static final Logger LOG = LoggerFactory.getLogger(ReduceCombineDriver.class); /** Fix length records with a length below this threshold will be in-place sorted, if possible. */ private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32; - - + + private TaskContext taskContext; private TypeSerializer serializer; private TypeComparator comparator; - + private ReduceFunction reducer; - + private Collector output; - + + private DriverStrategy strategy; + private InMemorySorter sorter; - + private QuickSort sortAlgo = new QuickSort(); + private ReduceHashTable table; + private List memory; - private boolean running; + private volatile boolean canceled; --- End diff -- I have now looked more into this `volatile` stuff, and the problem is that further investigation would be needed to ensure that the variable being `volatile` doesn't have a bad performance effect (see [1]). So maybe the best option for now is to make it non-volatile (as it is in all the other drivers) and just hope that it doesn't blow up. Should I open a Jira for doing further investigation and then maybe changing all the drivers? [1] http://brooker.co.za/blog/2012/09/10/volatile.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65216093 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java --- @@ -104,8 +106,21 @@ public void runCheckpointedProgram() { postSubmit(); } catch (Exception e) { + Throwable th = e; --- End diff -- What was the reason for this again? ð I don't remember. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308035#comment-15308035 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65216093 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java --- @@ -104,8 +106,21 @@ public void runCheckpointedProgram() { postSubmit(); } catch (Exception e) { + Throwable th = e; --- End diff -- What was the reason for this again? I don't remember. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3993) [py] Add generateSequence() support to Python API
[ https://issues.apache.org/jira/browse/FLINK-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308013#comment-15308013 ] ASF GitHub Bot commented on FLINK-3993: --- Github user zentol commented on the pull request: https://github.com/apache/flink/pull/2055 please add them as a separate commit, they will be squashed when merging. Good point regarding frm. > [py] Add generateSequence() support to Python API > - > > Key: FLINK-3993 > URL: https://issues.apache.org/jira/browse/FLINK-3993 > Project: Flink > Issue Type: Improvement > Components: Python API >Affects Versions: 1.0.3 >Reporter: Omar Alvarez >Priority: Minor > Original Estimate: 24h > Remaining Estimate: 24h > > Right now, I believe that there is only from_elements() support in order to > create a sequence of numbers. It is interesting to be able to create a list > of numbers from the Python API also, apart from the Java API. It would not be > complicated, since we already have generateSequence(). I am already working > on this, and will create a pull request shortly in Github. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3993] [py] Add generateSequence() support to Pyth...
Github user zentol commented on the pull request: https://github.com/apache/flink/pull/2055 please add them as a separate commit, they will be squashed when merging. Good point regarding frm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308005#comment-15308005 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65212938 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for + * i) monitoring a user-provided path, ii) deciding which files should be further read and processed, + * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning + * them to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link ProcessingMode} and the {@link FilePathFilter}. + * The splits of the files to be read are then forwarded to the downstream + * {@link ContinuousFileReaderOperator} which can have parallelism greater than one. + */ +@Internal +public class ContinuousFileMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); + + /** +* The minimum interval allowed between consecutive path scans. This is applicable if the +* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}. +*/ + public static final long MIN_MONITORING_INTERVAL = 100l; + + /** +* Specifies when computation will be triggered. +*/ + public enum ProcessingMode { --- End diff -- Also as `PublicEvolving`. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308004#comment-15308004 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65212915 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for + * i) monitoring a user-provided path, ii) deciding which files should be further read and processed, + * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning + * them to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link ProcessingMode} and the {@link FilePathFilter}. + * The splits of the files to be read are then forwarded to the downstream + * {@link ContinuousFileReaderOperator} which can have parallelism greater than one. + */ +@Internal +public class ContinuousFileMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); + + /** +* The minimum interval allowed between consecutive path scans. This is applicable if the +* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}. +*/ + public static final long MIN_MONITORING_INTERVAL = 100l; + + /** +* Specifies when computation will be triggered. +*/ + public enum ProcessingMode { --- End diff -- We should probably move this outside of this class and probably name it `FileProcessingMode`. Just in case we want to change the actual file source again. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65212938 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for + * i) monitoring a user-provided path, ii) deciding which files should be further read and processed, + * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning + * them to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link ProcessingMode} and the {@link FilePathFilter}. + * The splits of the files to be read are then forwarded to the downstream + * {@link ContinuousFileReaderOperator} which can have parallelism greater than one. + */ +@Internal +public class ContinuousFileMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); + + /** +* The minimum interval allowed between consecutive path scans. This is applicable if the +* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}. +*/ + public static final long MIN_MONITORING_INTERVAL = 100l; + + /** +* Specifies when computation will be triggered. +*/ + public enum ProcessingMode { --- End diff -- Also as `PublicEvolving`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65212915 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java --- @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.functions.source; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.JobException; +import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * This is the single (non-parallel) task which takes a {@link FileInputFormat} and is responsible for + * i) monitoring a user-provided path, ii) deciding which files should be further read and processed, + * iii) creating the {@link FileInputSplit FileInputSplits} corresponding to those files, and iv) assigning + * them to downstream tasks for further reading and processing. Which splits will be further processed + * depends on the user-provided {@link ProcessingMode} and the {@link FilePathFilter}. + * The splits of the files to be read are then forwarded to the downstream + * {@link ContinuousFileReaderOperator} which can have parallelism greater than one. + */ +@Internal +public class ContinuousFileMonitoringFunction + extends RichSourceFunction implements Checkpointed>, Tuple2 , Long>> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class); + + /** +* The minimum interval allowed between consecutive path scans. This is applicable if the +* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}. +*/ + public static final long MIN_MONITORING_INTERVAL = 100l; + + /** +* Specifies when computation will be triggered. +*/ + public enum ProcessingMode { --- End diff -- We should probably move this outside of this class and probably name it `FileProcessingMode`. Just in case we want to change the actual file source again. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307995#comment-15307995 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65212384 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala --- @@ -483,28 +500,31 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * @return The data stream that represents the data read from the given file */ def readFile[T: TypeInformation]( - inputFormat: FileInputFormat[T], - filePath: String, - watchType: WatchType, - interval: Long): DataStream[T] = +inputFormat: FileInputFormat[T], --- End diff -- IntelliJ formatting is off here, same below. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65212384 --- Diff: flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala --- @@ -483,28 +500,31 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * @return The data stream that represents the data read from the given file */ def readFile[T: TypeInformation]( - inputFormat: FileInputFormat[T], - filePath: String, - watchType: WatchType, - interval: Long): DataStream[T] = +inputFormat: FileInputFormat[T], --- End diff -- IntelliJ formatting is off here, same below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Closed] (FLINK-3886) Give a better error when the application Main class is not public.
[ https://issues.apache.org/jira/browse/FLINK-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-3886. --- > Give a better error when the application Main class is not public. > -- > > Key: FLINK-3886 > URL: https://issues.apache.org/jira/browse/FLINK-3886 > Project: Flink > Issue Type: Improvement >Reporter: Niels Basjes >Assignee: Niels Basjes > Fix For: 1.1.0 > > Attachments: FLINK-3886-20160509.patch > > > I wrote a Flink application and made the simple mistake of making the Main > class 'package private' by simply writing it as (note: I simply forgot the > 'public' keyword) > {code} > class Main { >... > } > {code} > The error you get is: > {code} > Caused by: java.lang.IllegalAccessException: Class > org.apache.flink.client.program.PackagedProgram can not access a member of > class com.bol.experiment.flink.Main with modifiers "public static" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at > java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) > at > java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) > at java.lang.reflect.Method.invoke(Method.java:490) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > {code} > This took me 30 minutes to figure out what I did wrong. > I think the error message should be more explanatory to the developer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-3978. --- > Add hasBroadcastVariable method to RuntimeContext > - > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3886) Give a better error when the application Main class is not public.
[ https://issues.apache.org/jira/browse/FLINK-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-3886. - Resolution: Fixed Fix Version/s: 1.1.0 Fixed via 6db9e6ae5c2f4968687a3fcae0520e05442bd9ab > Give a better error when the application Main class is not public. > -- > > Key: FLINK-3886 > URL: https://issues.apache.org/jira/browse/FLINK-3886 > Project: Flink > Issue Type: Improvement >Reporter: Niels Basjes >Assignee: Niels Basjes > Fix For: 1.1.0 > > Attachments: FLINK-3886-20160509.patch > > > I wrote a Flink application and made the simple mistake of making the Main > class 'package private' by simply writing it as (note: I simply forgot the > 'public' keyword) > {code} > class Main { >... > } > {code} > The error you get is: > {code} > Caused by: java.lang.IllegalAccessException: Class > org.apache.flink.client.program.PackagedProgram can not access a member of > class com.bol.experiment.flink.Main with modifiers "public static" > at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102) > at > java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296) > at > java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288) > at java.lang.reflect.Method.invoke(Method.java:490) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > {code} > This took me 30 minutes to figure out what I did wrong. > I think the error message should be more explanatory to the developer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart
[ https://issues.apache.org/jira/browse/FLINK-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-3979. - Resolution: Fixed Fix Version/s: 1.1.0 Fixed via ad52a5f0cebe6e5d593be0ae40656b28470af106 Thank you for the contribution! > [documentation]add missed import classes in run_example_quickstart > -- > > Key: FLINK-3979 > URL: https://issues.apache.org/jira/browse/FLINK-3979 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Jia Zhai > Fix For: 1.1.0 > > > The classes that need to be imported for this part of code > {code} > result > .map(new MapFunction, String>() { > @Override > public String map(Tuple2 tuple) { > return tuple.toString(); > } > }) > .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new > SimpleStringSchema())); > {code} > is > {code} > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > import org.apache.flink.api.common.functions.MapFunction; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart
[ https://issues.apache.org/jira/browse/FLINK-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-3979. --- > [documentation]add missed import classes in run_example_quickstart > -- > > Key: FLINK-3979 > URL: https://issues.apache.org/jira/browse/FLINK-3979 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Jia Zhai > Fix For: 1.1.0 > > > The classes that need to be imported for this part of code > {code} > result > .map(new MapFunction, String>() { > @Override > public String map(Tuple2 tuple) { > return tuple.toString(); > } > }) > .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new > SimpleStringSchema())); > {code} > is > {code} > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > import org.apache.flink.api.common.functions.MapFunction; > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext
[ https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-3978. - Resolution: Fixed Fixed via 5b0287971fa2beda360105d96e7bfbc7a110fae7 > Add hasBroadcastVariable method to RuntimeContext > - > > Key: FLINK-3978 > URL: https://issues.apache.org/jira/browse/FLINK-3978 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.1.0 >Reporter: Greg Hogan >Assignee: Greg Hogan >Priority: Minor > Fix For: 1.1.0 > > > The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an > exception if the accumulator does not exist or if the accumulator exists, but > with different type", although {{AbstractRuntimeUDFContext}} does not throw > an exception but will return null. > The javadocs for {{getBroadcastVariable}} do not mention throwing an > exception. Currently the only way to handle a broadcast variable that that > may or may not exist is to catch and ignore the exception. Adding a > {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this > explicit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307951#comment-15307951 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65209503 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -897,21 +900,21 @@ public TimeCharacteristic getStreamTimeCharacteristic() { * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit * @return The DataStream containing the given directory. */ - public DataStream readFileStream(String filePath, long intervalMillis, WatchType watchType) { + public DataStream readTextFile(String filePath, ProcessingMode watchType, long intervalMillis) { Preconditions.checkNotNull(filePath, "The file path may not be null."); TextInputFormat format = new TextInputFormat(new Path(filePath)); - return readFile(format, filePath, watchType, intervalMillis); + return readFile(format, filePath, watchType, intervalMillis, FilePathFilter.DefaultFilter.getInstance()); } /** * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such * line. The file will be read with the system's default character set. * * -* NOTES ON CHECKPOINTING: The source (which executes the {@link FileSplitMonitoringFunction}) +* NOTES ON CHECKPOINTING: The source (which executes the {@link ContinuousFileMonitoringFunction}) --- End diff -- Same as above, and in the javadoc below. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65209503 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -897,21 +900,21 @@ public TimeCharacteristic getStreamTimeCharacteristic() { * The mode in which the source should operate, i.e. monitor path and react to new data, or process once and exit * @return The DataStream containing the given directory. */ - public DataStream readFileStream(String filePath, long intervalMillis, WatchType watchType) { + public DataStream readTextFile(String filePath, ProcessingMode watchType, long intervalMillis) { Preconditions.checkNotNull(filePath, "The file path may not be null."); TextInputFormat format = new TextInputFormat(new Path(filePath)); - return readFile(format, filePath, watchType, intervalMillis); + return readFile(format, filePath, watchType, intervalMillis, FilePathFilter.DefaultFilter.getInstance()); } /** * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such * line. The file will be read with the system's default character set. * * -* NOTES ON CHECKPOINTING: The source (which executes the {@link FileSplitMonitoringFunction}) +* NOTES ON CHECKPOINTING: The source (which executes the {@link ContinuousFileMonitoringFunction}) --- End diff -- Same as above, and in the javadoc below. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307948#comment-15307948 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65209426 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -877,15 +880,15 @@ public TimeCharacteristic getStreamTimeCharacteristic() { /** * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such -* line. Depending on the provided {@link WatchType}, the source may periodically monitor (every {@code interval} ms) -* the path for new data ({@link WatchType#REPROCESS_WITH_APPENDED}), or process once the data currently in the path and -* exit ({@link WatchType#PROCESS_ONCE}). +* line. Depending on the provided {@link ProcessingMode}, the source may periodically monitor (every {@code interval} ms) +* the path for new data ({@link ProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the path and +* exit ({@link ProcessingMode#PROCESS_ONCE}). * * -* NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link WatchType#PROCESS_ONCE}, the source -* (which executes the {@link FileSplitMonitoringFunction}) monitors the path once, creates the +* NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link ProcessingMode#PROCESS_ONCE}, the source +* (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path once, creates the --- End diff -- I think we can remove the "(which executes the {@link ContinuousFileMonitoringFunction})" part. The source does not execute this, it is itself the source. > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65209426 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java --- @@ -877,15 +880,15 @@ public TimeCharacteristic getStreamTimeCharacteristic() { /** * Reads the given file line-by-line and creates a data stream that contains a string with the contents of each such -* line. Depending on the provided {@link WatchType}, the source may periodically monitor (every {@code interval} ms) -* the path for new data ({@link WatchType#REPROCESS_WITH_APPENDED}), or process once the data currently in the path and -* exit ({@link WatchType#PROCESS_ONCE}). +* line. Depending on the provided {@link ProcessingMode}, the source may periodically monitor (every {@code interval} ms) +* the path for new data ({@link ProcessingMode#PROCESS_CONTINUOUSLY}), or process once the data currently in the path and +* exit ({@link ProcessingMode#PROCESS_ONCE}). * * -* NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link WatchType#PROCESS_ONCE}, the source -* (which executes the {@link FileSplitMonitoringFunction}) monitors the path once, creates the +* NOTES ON CHECKPOINTING: If the {@code watchType} is set to {@link ProcessingMode#PROCESS_ONCE}, the source +* (which executes the {@link ContinuousFileMonitoringFunction}) monitors the path once, creates the --- End diff -- I think we can remove the "(which executes the {@link ContinuousFileMonitoringFunction})" part. The source does not execute this, it is itself the source. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65207037 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -82,6 +82,8 @@ */ private static int MAX_SAMPLE_LEN; + private boolean restoring = false; --- End diff -- That's probable left from a previous change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent
[ https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307930#comment-15307930 ] ASF GitHub Bot commented on FLINK-2314: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/2020#discussion_r65207037 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java --- @@ -82,6 +82,8 @@ */ private static int MAX_SAMPLE_LEN; + private boolean restoring = false; --- End diff -- That's probable left from a previous change > Make Streaming File Sources Persistent > -- > > Key: FLINK-2314 > URL: https://issues.apache.org/jira/browse/FLINK-2314 > Project: Flink > Issue Type: Improvement > Components: Streaming >Affects Versions: 0.9 >Reporter: Stephan Ewen >Assignee: Kostas Kloudas > > Streaming File sources should participate in the checkpointing. They should > track the bytes they read from the file and checkpoint it. > One can look at the sequence generating source function for an example of a > checkpointed source. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3994) Instable KNNITSuite
[ https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307904#comment-15307904 ] ASF GitHub Bot commented on FLINK-3994: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2056 Thanks for clarifying @StephanEwen! I'll merge this after Travis succeed. > Instable KNNITSuite > --- > > Key: FLINK-3994 > URL: https://issues.apache.org/jira/browse/FLINK-3994 > Project: Flink > Issue Type: Bug > Components: Machine Learning Library, Tests >Affects Versions: 1.1.0 >Reporter: Chiwan Park >Assignee: Chiwan Park >Priority: Critical > Labels: test-stability > Fix For: 1.1.0 > > > KNNITSuite fails in Travis-CI with following error: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > ... > Cause: java.io.IOException: Insufficient number of network buffers: > required 32, but only 4 available. The total number of network buffers is > currently set to 2048. You can increase this number by setting the > configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497) > at java.lang.Thread.run(Thread.java:745) > ... > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/2056 Thanks for clarifying @StephanEwen! I'll merge this after Travis succeed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3994) Instable KNNITSuite
[ https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307899#comment-15307899 ] ASF GitHub Bot commented on FLINK-3994: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/2056 There is actually a problem with the way the Scala Tests are written: The code in the class that is outside the "it should" clauses is executed before the "before" function. That is why the tests go against a LocalEnvironment, rather than the test context environment. So Chiwan's patch will actually fix it, only for different reasons than initially expected. > Instable KNNITSuite > --- > > Key: FLINK-3994 > URL: https://issues.apache.org/jira/browse/FLINK-3994 > Project: Flink > Issue Type: Bug > Components: Machine Learning Library, Tests >Affects Versions: 1.1.0 >Reporter: Chiwan Park >Assignee: Chiwan Park >Priority: Critical > Labels: test-stability > Fix For: 1.1.0 > > > KNNITSuite fails in Travis-CI with following error: > {code} > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > ... > Cause: java.io.IOException: Insufficient number of network buffers: > required 32, but only 4 available. The total number of network buffers is > currently set to 2048. You can increase this number by setting the > configuration key 'taskmanager.network.numberOfBuffers'. > at > org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196) > at > org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497) > at java.lang.Thread.run(Thread.java:745) > ... > {code} > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt > https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)