[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309297#comment-15309297
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65303646
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+   

[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread subhankarb
Github user subhankarb commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65303646
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+   this.requestedChannelMax = requestedChannelMax;
+   this.requestedFrameMax = requestedFrameMax;
+   this.requestedHeartbeat = requestedHeartbeat;
+   }
+
+   /**
+*
+* @param uri 

[jira] [Created] (FLINK-4000) Exception: Could not restore checkpointed state to operators and functions; during Job Restart (Job restart is triggered due to one of the task manager failure)

2016-05-31 Thread Aride Chettali (JIRA)
Aride Chettali created FLINK-4000:
-

 Summary: Exception: Could not restore checkpointed state to 
operators and functions;  during Job Restart (Job restart is triggered due to 
one of the task manager failure)
 Key: FLINK-4000
 URL: https://issues.apache.org/jira/browse/FLINK-4000
 Project: Flink
  Issue Type: Bug
  Components: DataStream API
Affects Versions: 1.0.3
 Environment: //Fault Tolerance Configuration of the Job

env.enableCheckpointing(5000); 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3,1));
Reporter: Aride Chettali


java.lang.Exception: Could not restore checkpointed state to operators and 
functions
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:457)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:209)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Failed to restore state to function: null
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:168)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreState(StreamTask.java:449)
... 3 more
Caused by: java.lang.NullPointerException
at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:184)
at 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase.restoreState(MessageAcknowledgingSourceBase.java:80)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreState(AbstractUdfStreamOperator.java:165)
... 4 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1979) Implement Loss Functions

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309058#comment-15309058
 ] 

ASF GitHub Bot commented on FLINK-1979:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1985
  
Hi @skavulya, I just quickly reviewed your updated PR and left few 
comments. They are not critical things but It would be better to fix them. 
Other things are very good.


> Implement Loss Functions
> 
>
> Key: FLINK-1979
> URL: https://issues.apache.org/jira/browse/FLINK-1979
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Johannes Günther
>Assignee: Johannes Günther
>Priority: Minor
>  Labels: ML
>
> For convex optimization problems, optimizer methods like SGD rely on a 
> pluggable implementation of a loss function and its first derivative.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/1985
  
Hi @skavulya, I just quickly reviewed your updated PR and left few 
comments. They are not critical things but It would be better to fix them. 
Other things are very good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1979) Implement Loss Functions

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309056#comment-15309056
 ] 

ASF GitHub Bot commented on FLINK-1979:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291933
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationPenaltyTest.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+
+class RegularizationPenaltyTest extends FlatSpec with Matchers with 
FlinkTestBase {
--- End diff --

Same as above, we don't need to extend `FlinkTestBase`.


> Implement Loss Functions
> 
>
> Key: FLINK-1979
> URL: https://issues.apache.org/jira/browse/FLINK-1979
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Johannes Günther
>Assignee: Johannes Günther
>Priority: Minor
>  Labels: ML
>
> For convex optimization problems, optimizer methods like SGD rely on a 
> pluggable implementation of a loss function and its first derivative.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291933
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationPenaltyTest.scala
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.DenseVector
+import org.apache.flink.test.util.FlinkTestBase
+import org.scalatest.{FlatSpec, Matchers}
+
+
+class RegularizationPenaltyTest extends FlatSpec with Matchers with 
FlinkTestBase {
--- End diff --

Same as above, we don't need to extend `FlinkTestBase`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1979) Implement Loss Functions

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309054#comment-15309054
 ] 

ASF GitHub Bot commented on FLINK-1979:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291911
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.common.{LabeledVector, WeightVector}
+import org.apache.flink.ml.math.DenseVector
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.test.util.FlinkTestBase
+
+
+class LossFunctionTest extends FlatSpec with Matchers with FlinkTestBase {
--- End diff --

We don't need to extend `FlinkTestBase` because `LossFunctionTest` do not 
use Flink cluster.


> Implement Loss Functions
> 
>
> Key: FLINK-1979
> URL: https://issues.apache.org/jira/browse/FLINK-1979
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Johannes Günther
>Assignee: Johannes Günther
>Priority: Minor
>  Labels: ML
>
> For convex optimization problems, optimizer methods like SGD rely on a 
> pluggable implementation of a loss function and its first derivative.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291911
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionTest.scala
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.common.{LabeledVector, WeightVector}
+import org.apache.flink.ml.math.DenseVector
+import org.scalatest.{Matchers, FlatSpec}
+import org.apache.flink.test.util.FlinkTestBase
+
+
+class LossFunctionTest extends FlatSpec with Matchers with FlinkTestBase {
--- End diff --

We don't need to extend `FlinkTestBase` because `LossFunctionTest` do not 
use Flink cluster.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291552
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/RegularizationPenalty.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.{Vector, BLAS}
+import org.apache.flink.ml.math.Breeze._
+import breeze.linalg.{norm => BreezeNorm}
+
+/** Represents a type of regularization penalty
+  *
+  * Regularization penalties are used to restrict the optimization problem 
to solutions with
+  * certain desirable characteristics, such as sparsity for the L1 
penalty, or penalizing large
+  * weights for the L2 penalty.
+  *
+  * The regularization term, `R(w)` is added to the objective function, 
`f(w) = L(w) + lambda*R(w)`
+  * where lambda is the regularization parameter used to tune the amount 
of regularization applied.
+  */
+trait RegularizationPenalty extends Serializable {
+
+  /** Calculates the new weights based on the gradient and regularization 
penalty
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient used to update the weights
+* @param regularizationConstant The regularization parameter to be 
applied 
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector
+
+  /** Adds regularization to the loss value
+*
+* @param oldLoss The loss to be updated
+* @param weightVector The gradient used to update the loss
+* @param regularizationConstant The regularization parameter to be 
applied
+* @return Updated loss
+*/
+  def regLoss(oldLoss: Double, weightVector: Vector, 
regularizationConstant: Double): Double
+
+}
+
+
+/** `L_2` regularization penalty.
+  *
+  * The regularization function is the square of the L2 norm 
`1/2*||w||_2^2`
+  * with `w` being the weight vector. The function penalizes large weights,
+  * favoring solutions with more small weights rather than few large ones.
+  */
+object L2Regularization extends RegularizationPenalty {
+
+  /** Calculates the new weights based on the gradient and L2 
regularization penalty
+*
+* The updated weight is `w - learningRate *(gradient + lambda * w)` 
where
+* `w` is the weight vector, and `lambda` is the regularization 
parameter.
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient according to which we will update the 
weights
+* @param regularizationConstant The regularization parameter to be 
applied
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  override def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector = {
+// add the gradient of the L2 regularization
+BLAS.axpy(regularizationConstant, weightVector, gradient)
+
+// update the weights according to the learning rate
+BLAS.axpy(-learningRate, gradient, weightVector)
+
+weightVector
+  }
+
+  /** Adds regularization to the loss value
+*
+* The updated loss is `oldLoss + lambda * 1/2*||w||_2^2` where
+* `w` is the weight vector, and `lambda` is the regularization 
parameter
+*
+* @param oldLoss The loss to be updated
+* @param weightVector The gradient used to update the loss
+* @param regularizationConstant The regularization parameter to be 
applied
+* @return Updated loss
+*/
+  override def 

[jira] [Commented] (FLINK-1979) Implement Loss Functions

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309044#comment-15309044
 ] 

ASF GitHub Bot commented on FLINK-1979:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291552
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/RegularizationPenalty.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.optimization
+
+import org.apache.flink.ml.math.{Vector, BLAS}
+import org.apache.flink.ml.math.Breeze._
+import breeze.linalg.{norm => BreezeNorm}
+
+/** Represents a type of regularization penalty
+  *
+  * Regularization penalties are used to restrict the optimization problem 
to solutions with
+  * certain desirable characteristics, such as sparsity for the L1 
penalty, or penalizing large
+  * weights for the L2 penalty.
+  *
+  * The regularization term, `R(w)` is added to the objective function, 
`f(w) = L(w) + lambda*R(w)`
+  * where lambda is the regularization parameter used to tune the amount 
of regularization applied.
+  */
+trait RegularizationPenalty extends Serializable {
+
+  /** Calculates the new weights based on the gradient and regularization 
penalty
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient used to update the weights
+* @param regularizationConstant The regularization parameter to be 
applied 
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector
+
+  /** Adds regularization to the loss value
+*
+* @param oldLoss The loss to be updated
+* @param weightVector The gradient used to update the loss
+* @param regularizationConstant The regularization parameter to be 
applied
+* @return Updated loss
+*/
+  def regLoss(oldLoss: Double, weightVector: Vector, 
regularizationConstant: Double): Double
+
+}
+
+
+/** `L_2` regularization penalty.
+  *
+  * The regularization function is the square of the L2 norm 
`1/2*||w||_2^2`
+  * with `w` being the weight vector. The function penalizes large weights,
+  * favoring solutions with more small weights rather than few large ones.
+  */
+object L2Regularization extends RegularizationPenalty {
+
+  /** Calculates the new weights based on the gradient and L2 
regularization penalty
+*
+* The updated weight is `w - learningRate *(gradient + lambda * w)` 
where
+* `w` is the weight vector, and `lambda` is the regularization 
parameter.
+*
+* @param weightVector The weights to be updated
+* @param gradient The gradient according to which we will update the 
weights
+* @param regularizationConstant The regularization parameter to be 
applied
+* @param learningRate The effective step size for this iteration
+* @return Updated weights
+*/
+  override def takeStep(
+  weightVector: Vector,
+  gradient: Vector,
+  regularizationConstant: Double,
+  learningRate: Double)
+: Vector = {
+// add the gradient of the L2 regularization
+BLAS.axpy(regularizationConstant, weightVector, gradient)
+
+// update the weights according to the learning rate
+BLAS.axpy(-learningRate, gradient, weightVector)
+
+weightVector
+  }
+
+  /** Adds regularization to the loss value
+*
+* The updated loss is `oldLoss + lambda * 1/2*||w||_2^2` where
+* `w` is the weight vector, and `lambda` is the regularization 
parameter
+*
+* @param oldLoss The loss to 

[jira] [Commented] (FLINK-1979) Implement Loss Functions

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309043#comment-15309043
 ] 

ASF GitHub Bot commented on FLINK-1979:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291461
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 ---
@@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction {
 
   /** Calculates the loss depending on the label and the prediction
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
 */
   override def loss(prediction: Double, label: Double): Double = {
 0.5 * (prediction - label) * (prediction - label)
   }
 
   /** Calculates the derivative of the [[PartialLossFunction]]
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
 */
   override def derivative(prediction: Double, label: Double): Double = {
 (prediction - label)
   }
 }
+
+/** Logistic loss function which can be used with the 
[[GenericLossFunction]]
+  *
+  *
+  * The [[LogisticLoss]] function implements `log(1 + 
-exp(prediction*label))`
+  * for binary classification with label in {-1, 1}
+  */
+object LogisticLoss extends PartialLossFunction {
+
+  /** Calculates the loss depending on the label and the prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
+*/
+  override def loss(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return math.exp(-z)
+}
+else if (z < -18) {
+  return -z
+}
+
+math.log(1 + math.exp(-z))
+  }
+
+  /** Calculates the derivative of the loss function with respect to the 
prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
+*/
+  override def derivative(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return -label * math.exp(-z)
+}
+else if (z < -18) {
+  return -label
+}
+
+-label/(math.exp(z) + 1)
+  }
--- End diff --

As I said above, following is better:

```scala
if (z > 18) {
  -label * math.exp(-z)
} else if (z < -18) {
  -label
} else {
  -label / (math.exp(z) + 1)
}
```


> Implement Loss Functions
> 
>
> Key: FLINK-1979
> URL: https://issues.apache.org/jira/browse/FLINK-1979
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Johannes Günther
>Assignee: Johannes Günther
>Priority: Minor
>  Labels: ML
>
> For convex optimization problems, optimizer methods like SGD rely on a 
> pluggable implementation of a loss function and its first derivative.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291461
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 ---
@@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction {
 
   /** Calculates the loss depending on the label and the prediction
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
 */
   override def loss(prediction: Double, label: Double): Double = {
 0.5 * (prediction - label) * (prediction - label)
   }
 
   /** Calculates the derivative of the [[PartialLossFunction]]
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
 */
   override def derivative(prediction: Double, label: Double): Double = {
 (prediction - label)
   }
 }
+
+/** Logistic loss function which can be used with the 
[[GenericLossFunction]]
+  *
+  *
+  * The [[LogisticLoss]] function implements `log(1 + 
-exp(prediction*label))`
+  * for binary classification with label in {-1, 1}
+  */
+object LogisticLoss extends PartialLossFunction {
+
+  /** Calculates the loss depending on the label and the prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
+*/
+  override def loss(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return math.exp(-z)
+}
+else if (z < -18) {
+  return -z
+}
+
+math.log(1 + math.exp(-z))
+  }
+
+  /** Calculates the derivative of the loss function with respect to the 
prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
+*/
+  override def derivative(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return -label * math.exp(-z)
+}
+else if (z < -18) {
+  return -label
+}
+
+-label/(math.exp(z) + 1)
+  }
--- End diff --

As I said above, following is better:

```scala
if (z > 18) {
  -label * math.exp(-z)
} else if (z < -18) {
  -label
} else {
  -label / (math.exp(z) + 1)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291353
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 ---
@@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction {
 
   /** Calculates the loss depending on the label and the prediction
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
 */
   override def loss(prediction: Double, label: Double): Double = {
 0.5 * (prediction - label) * (prediction - label)
   }
 
   /** Calculates the derivative of the [[PartialLossFunction]]
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
 */
   override def derivative(prediction: Double, label: Double): Double = {
 (prediction - label)
   }
 }
+
+/** Logistic loss function which can be used with the 
[[GenericLossFunction]]
+  *
+  *
+  * The [[LogisticLoss]] function implements `log(1 + 
-exp(prediction*label))`
+  * for binary classification with label in {-1, 1}
+  */
+object LogisticLoss extends PartialLossFunction {
+
+  /** Calculates the loss depending on the label and the prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
+*/
+  override def loss(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return math.exp(-z)
+}
+else if (z < -18) {
+  return -z
+}
+
+math.log(1 + math.exp(-z))
--- End diff --

Using `return` is not recommended in Scala. Could you change this like 
following?

```scala
if (z > 18) {
  math.exp(-z)
} else if (z < -18) {
  -z
} else {
  math.log(1 + math.exp(-z))
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1979) Implement Loss Functions

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309038#comment-15309038
 ] 

ASF GitHub Bot commented on FLINK-1979:
---

Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/1985#discussion_r65291353
  
--- Diff: 
flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 ---
@@ -47,21 +47,106 @@ object SquaredLoss extends PartialLossFunction {
 
   /** Calculates the loss depending on the label and the prediction
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
 */
   override def loss(prediction: Double, label: Double): Double = {
 0.5 * (prediction - label) * (prediction - label)
   }
 
   /** Calculates the derivative of the [[PartialLossFunction]]
 *
-* @param prediction
-* @param label
-* @return
+* @param prediction The predicted value
+* @param label The true value
+* @return The derivative of the loss function
 */
   override def derivative(prediction: Double, label: Double): Double = {
 (prediction - label)
   }
 }
+
+/** Logistic loss function which can be used with the 
[[GenericLossFunction]]
+  *
+  *
+  * The [[LogisticLoss]] function implements `log(1 + 
-exp(prediction*label))`
+  * for binary classification with label in {-1, 1}
+  */
+object LogisticLoss extends PartialLossFunction {
+
+  /** Calculates the loss depending on the label and the prediction
+*
+* @param prediction The predicted value
+* @param label The true value
+* @return The loss
+*/
+  override def loss(prediction: Double, label: Double): Double = {
+val z = prediction * label
+
+// based on implementation in scikit-learn
+// approximately equal and saves the computation of the log
+if (z > 18) {
+  return math.exp(-z)
+}
+else if (z < -18) {
+  return -z
+}
+
+math.log(1 + math.exp(-z))
--- End diff --

Using `return` is not recommended in Scala. Could you change this like 
following?

```scala
if (z > 18) {
  math.exp(-z)
} else if (z < -18) {
  -z
} else {
  math.log(1 + math.exp(-z))
}
```


> Implement Loss Functions
> 
>
> Key: FLINK-1979
> URL: https://issues.apache.org/jira/browse/FLINK-1979
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Johannes Günther
>Assignee: Johannes Günther
>Priority: Minor
>  Labels: ML
>
> For convex optimization problems, optimizer methods like SGD rely on a 
> pluggable implementation of a loss function and its first derivative.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3994) Instable KNNITSuite

2016-05-31 Thread Chiwan Park (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chiwan Park closed FLINK-3994.
--
Resolution: Fixed

Fixed via aedc0fd481a756ef2b1708f896d5500475715232.

Thanks for guidance [~StephanEwen] and [~mxm]! :-)

> Instable KNNITSuite
> ---
>
> Key: FLINK-3994
> URL: https://issues.apache.org/jira/browse/FLINK-3994
> Project: Flink
>  Issue Type: Bug
>  Components: Machine Learning Library, Tests
>Affects Versions: 1.1.0
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.1.0
>
>
> KNNITSuite fails in Travis-CI with following error:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   ...
>   Cause: java.io.IOException: Insufficient number of network buffers: 
> required 32, but only 4 available. The total number of network buffers is 
> currently set to 2048. You can increase this number by setting the 
> configuration key 'taskmanager.network.numberOfBuffers'.
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>   at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497)
>   at java.lang.Thread.run(Thread.java:745)
>   ...
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests

2016-05-31 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2056


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3994) Instable KNNITSuite

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309020#comment-15309020
 ] 

ASF GitHub Bot commented on FLINK-3994:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2056


> Instable KNNITSuite
> ---
>
> Key: FLINK-3994
> URL: https://issues.apache.org/jira/browse/FLINK-3994
> Project: Flink
>  Issue Type: Bug
>  Components: Machine Learning Library, Tests
>Affects Versions: 1.1.0
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.1.0
>
>
> KNNITSuite fails in Travis-CI with following error:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   ...
>   Cause: java.io.IOException: Insufficient number of network buffers: 
> required 32, but only 4 available. The total number of network buffers is 
> currently set to 2048. You can increase this number by setting the 
> configuration key 'taskmanager.network.numberOfBuffers'.
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>   at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497)
>   at java.lang.Thread.run(Thread.java:745)
>   ...
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2056
  
Merging...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3994) Instable KNNITSuite

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15309017#comment-15309017
 ] 

ASF GitHub Bot commented on FLINK-3994:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2056
  
Merging...


> Instable KNNITSuite
> ---
>
> Key: FLINK-3994
> URL: https://issues.apache.org/jira/browse/FLINK-3994
> Project: Flink
>  Issue Type: Bug
>  Components: Machine Learning Library, Tests
>Affects Versions: 1.1.0
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.1.0
>
>
> KNNITSuite fails in Travis-CI with following error:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   ...
>   Cause: java.io.IOException: Insufficient number of network buffers: 
> required 32, but only 4 available. The total number of network buffers is 
> currently set to 2048. You can increase this number by setting the 
> configuration key 'taskmanager.network.numberOfBuffers'.
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>   at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497)
>   at java.lang.Thread.run(Thread.java:745)
>   ...
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3758) Add possibility to register accumulators in custom triggers

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308581#comment-15308581
 ] 

ASF GitHub Bot commented on FLINK-3758:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1979
  
How about tagging the new method with `@PublicEvolving`. Since its a new 
API anyways, its good not to make it part of the stable interfaces.

With Flink 2.0, we can remove the `Evolving` part ;)


> Add possibility to register accumulators in custom triggers
> ---
>
> Key: FLINK-3758
> URL: https://issues.apache.org/jira/browse/FLINK-3758
> Project: Flink
>  Issue Type: Improvement
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Minor
>
> For monitoring purposes it would be nice to be able to to use accumulators in 
> custom trigger functions. 
> Basically, the trigger context could just expose {{getAccumulator}} of 
> {{RuntimeContext}} or does this create problems I am not aware of?
> Adding accumulators in a trigger function is more difficult, I think, but 
> that's not really neccessary as the accummulator could just be added in some 
> other upstream operator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3758] Add possibility to register accumulators in...

2016-05-31 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1979
  
How about tagging the new method with `@PublicEvolving`. Since its a new 
API anyways, its good not to make it part of the stable interfaces.

With Flink 2.0, we can remove the `Evolving` part ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3054) Remove R (return) type variable from SerializationSchema

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308531#comment-15308531
 ] 

ASF GitHub Bot commented on FLINK-3054:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1406
  
I think you can use this class as a reference: 
https://github.com/apache/flink/blob/2c507f93300ad1b53c477e0939f1f2232187b37f/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java



> Remove R (return) type variable from SerializationSchema
> 
>
> Key: FLINK-3054
> URL: https://issues.apache.org/jira/browse/FLINK-3054
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Reporter: Robert Metzger
>Assignee: Robert Metzger
> Fix For: 1.0.0
>
>
> For the upcoming 1.0 release, I would like to remove the R type parameter 
> from the {{SerializationSchema}} class.
> The {{DeserializationSchema}} doesn't have a variable input type either and I 
> think its okay to serialize and deserialize to byte[] all the time through 
> these interfaces.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3054] Remove R (return) type variable from Serial...

2016-05-31 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/1406
  
I think you can use this class as a reference: 
https://github.com/apache/flink/blob/2c507f93300ad1b53c477e0939f1f2232187b37f/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308525#comment-15308525
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2054
  
Thank you for your contribution.
There are still some open issues that need addressing before we can merge 
this.


> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/2054
  
Thank you for your contribution.
There are still some open issues that need addressing before we can merge 
this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308520#comment-15308520
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253337
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -35,24 +36,28 @@
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
private String QUEUE_NAME;
-   private String HOST_NAME;
+   private RMQConnectionConfig rmqConnectionConfig;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private SerializationSchema schema;
 
-   public RMQSink(String HOST_NAME, String QUEUE_NAME, 
SerializationSchema schema) {
-   this.HOST_NAME = HOST_NAME;
+   /**
+* @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+* @param QUEUE_NAME The queue to publish messages to.
+* @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+ */
+   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
QUEUE_NAME, SerializationSchema schema) {
+   this.rmqConnectionConfig = rmqConnectionConfig;
this.QUEUE_NAME = QUEUE_NAME;
this.schema = schema;
}
 
/**
 * Initializes the connection to RMQ.
 */
-   public void initializeConnection() {
-   factory = new ConnectionFactory();
-   factory.setHost(HOST_NAME);
+   public void initializeConnection() throws Exception {
+   factory = rmqConnectionConfig.getConnectionFactory();
--- End diff --

Another comment which is something that has been like this before: Could 
you remove the initializeConnection() method and move all its content into the 
open() method.
That reduces unnecessary boilerplate. 

Thank you ;)


> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253337
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -35,24 +36,28 @@
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
private String QUEUE_NAME;
-   private String HOST_NAME;
+   private RMQConnectionConfig rmqConnectionConfig;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private SerializationSchema schema;
 
-   public RMQSink(String HOST_NAME, String QUEUE_NAME, 
SerializationSchema schema) {
-   this.HOST_NAME = HOST_NAME;
+   /**
+* @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+* @param QUEUE_NAME The queue to publish messages to.
+* @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+ */
+   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
QUEUE_NAME, SerializationSchema schema) {
+   this.rmqConnectionConfig = rmqConnectionConfig;
this.QUEUE_NAME = QUEUE_NAME;
this.schema = schema;
}
 
/**
 * Initializes the connection to RMQ.
 */
-   public void initializeConnection() {
-   factory = new ConnectionFactory();
-   factory.setHost(HOST_NAME);
+   public void initializeConnection() throws Exception {
+   factory = rmqConnectionConfig.getConnectionFactory();
--- End diff --

Another comment which is something that has been like this before: Could 
you remove the initializeConnection() method and move all its content into the 
open() method.
That reduces unnecessary boilerplate. 

Thank you ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253122
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -35,24 +36,28 @@
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
private String QUEUE_NAME;
--- End diff --

I know its not part of the issue's scope, but could you rename this field 
to `queueName`. Otherwise, the class has a very poor style which makes it hard 
to read it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308514#comment-15308514
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253122
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -35,24 +36,28 @@
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
private String QUEUE_NAME;
--- End diff --

I know its not part of the issue's scope, but could you rename this field 
to `queueName`. Otherwise, the class has a very poor style which makes it hard 
to read it.


> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308516#comment-15308516
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253162
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -35,24 +36,28 @@
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
private String QUEUE_NAME;
-   private String HOST_NAME;
+   private RMQConnectionConfig rmqConnectionConfig;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private SerializationSchema schema;
 
-   public RMQSink(String HOST_NAME, String QUEUE_NAME, 
SerializationSchema schema) {
-   this.HOST_NAME = HOST_NAME;
+   /**
+* @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+* @param QUEUE_NAME The queue to publish messages to.
+* @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+ */
+   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
QUEUE_NAME, SerializationSchema schema) {
--- End diff --

Same here with the QUEUE_NAME


> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253162
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
 ---
@@ -35,24 +36,28 @@
private static final Logger LOG = 
LoggerFactory.getLogger(RMQSink.class);
 
private String QUEUE_NAME;
-   private String HOST_NAME;
+   private RMQConnectionConfig rmqConnectionConfig;
private transient ConnectionFactory factory;
private transient Connection connection;
private transient Channel channel;
private SerializationSchema schema;
 
-   public RMQSink(String HOST_NAME, String QUEUE_NAME, 
SerializationSchema schema) {
-   this.HOST_NAME = HOST_NAME;
+   /**
+* @param rmqConnectionConfig The RabbiMQ connection configuration 
{@link RMQConnectionConfig}.
+* @param QUEUE_NAME The queue to publish messages to.
+* @param schema A {@link SerializationSchema} for turning the Java 
objects received into bytes
+ */
+   public RMQSink(RMQConnectionConfig rmqConnectionConfig, String 
QUEUE_NAME, SerializationSchema schema) {
--- End diff --

Same here with the QUEUE_NAME


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308512#comment-15308512
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253016
  
--- Diff: docs/apis/streaming/connectors/rabbitmq.md ---
@@ -71,23 +71,25 @@ Example:
 
 
 {% highlight java %}
+RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder().build();
--- End diff --

As I've written below, I would prefer if users would not rely on the fact 
that "localhost" is the default value here.
Once the builder has been changed, we would expect the user to call 
`.setHost("localhost")` here.


> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65253016
  
--- Diff: docs/apis/streaming/connectors/rabbitmq.md ---
@@ -71,23 +71,25 @@ Example:
 
 
 {% highlight java %}
+RMQConnectionConfig connectionConfig = new 
RMQConnectionConfig.Builder().build();
--- End diff --

As I've written below, I would prefer if users would not rely on the fact 
that "localhost" is the default value here.
Once the builder has been changed, we would expect the user to call 
`.setHost("localhost")` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308506#comment-15308506
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65252614
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+ 

[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65252614
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+   this.requestedChannelMax = requestedChannelMax;
+   this.requestedFrameMax = requestedFrameMax;
+   this.requestedHeartbeat = requestedHeartbeat;
+   }
+
+   /**
+*
+* @param uri 

[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65252489
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+   this.requestedChannelMax = requestedChannelMax;
+   this.requestedFrameMax = requestedFrameMax;
+   this.requestedHeartbeat = requestedHeartbeat;
+   }
+
+   /**
+*
+* @param uri 

[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308501#comment-15308501
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65252489
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+ 

[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308485#comment-15308485
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65251121
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+ 

[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65251121
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+   this.requestedChannelMax = requestedChannelMax;
+   this.requestedFrameMax = requestedFrameMax;
+   this.requestedHeartbeat = requestedHeartbeat;
+   }
+
+   /**
+*
+* @param uri 

[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308479#comment-15308479
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65250839
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+ 

[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65250839
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Connection Configuration for RMQ.
+ * If {@link Builder#setUri(String)} has been set then {@link 
RMQConnectionConfig#RMQConnectionConfig(String, int, boolean, boolean, int, 
int, int, int)}
+ * will be used for initialize the RMQ connection or
+ * {@link RMQConnectionConfig#RMQConnectionConfig(String, int, String, 
String, String, int, boolean, boolean, int, int, int, int)}
+ * will be used for initialize the RMQ connection
+ */
+public class RMQConnectionConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(RMQConnectionConfig.class);
+
+   private String host;
+   private int port;
+   private String virtualHost;
+   private String username;
+   private String password;
+   private String uri;
+
+   private int networkRecoveryInterval;
+   private boolean automaticRecovery;
+   private boolean topologyRecovery;
+
+   private int connectionTimeout;
+   private int requestedChannelMax;
+   private int requestedFrameMax;
+   private int requestedHeartbeat;
+
+   /**
+*
+* @param host host name
+* @param port port
+* @param virtualHost virtual host
+* @param username username
+* @param password password
+
+* @param networkRecoveryInterval connection recovery interval in 
milliseconds
+* @param automaticRecovery if automatic connection recovery
+* @param topologyRecovery if topology recovery
+* @param connectionTimeout connection timeout
+* @param requestedChannelMax requested maximum channel number
+* @param requestedFrameMax requested maximum frame size
+* @param requestedHeartbeat requested heartbeat interval
+* @throws NullPointerException if host or virtual host or username or 
password is null
+ */
+   private RMQConnectionConfig(String host, int port, String virtualHost, 
String username, String password,
+   int 
networkRecoveryInterval, boolean automaticRecovery,
+   boolean 
topologyRecovery, int connectionTimeout, int requestedChannelMax, int 
requestedFrameMax,
+   int 
requestedHeartbeat){
+   Preconditions.checkNotNull(host, "host can not be null");
+   Preconditions.checkNotNull(virtualHost, "virtualHost can not be 
null");
+   Preconditions.checkNotNull(username, "username can not be 
null");
+   Preconditions.checkNotNull(password, "password can not be 
null");
+   this.host = host;
+   this.port = port;
+   this.virtualHost = virtualHost;
+   this.username = username;
+   this.password = password;
+
+   this.networkRecoveryInterval = networkRecoveryInterval;
+   this.automaticRecovery = automaticRecovery;
+   this.topologyRecovery = topologyRecovery;
+   this.connectionTimeout = connectionTimeout;
+   this.requestedChannelMax = requestedChannelMax;
+   this.requestedFrameMax = requestedFrameMax;
+   this.requestedHeartbeat = requestedHeartbeat;
+   }
+
+   /**
+*
+* @param uri 

[jira] [Commented] (FLINK-3763) RabbitMQ Source/Sink standardize connection parameters

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308478#comment-15308478
 ] 

ASF GitHub Bot commented on FLINK-3763:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65250708
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
--- End diff --

Could you use Flink's own `Preconditions` class in org.apache.flink.util?
We are trying to get rid of guava in the long run.


> RabbitMQ Source/Sink standardize connection parameters
> --
>
> Key: FLINK-3763
> URL: https://issues.apache.org/jira/browse/FLINK-3763
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Affects Versions: 1.0.1
>Reporter: Robert Batts
>Assignee: Subhankar Biswas
>
> The RabbitMQ source and sink should have the same capabilities in terms of 
> establishing a connection, currently the sink is lacking connection 
> parameters that are available on the source. Additionally, VirtualHost should 
> be an offered parameter for multi-tenant RabbitMQ clusters (if not specified 
> it goes to the vhost '/').
> Connection Parameters
> ===
> - Host - Offered on both
> - Port - Source only
> - Virtual Host - Neither
> - User - Source only
> - Password - Source only
> Additionally, it might be worth offer the URI as a valid constructor because 
> that would offer all 5 of the above parameters in a single String.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3763] RabbitMQ Source/Sink standardize connection...

2016-05-31 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/2054#discussion_r65250708
  
--- Diff: 
flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
 ---
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq.common;
+
+import com.google.common.base.Preconditions;
--- End diff --

Could you use Flink's own `Preconditions` class in org.apache.flink.util?
We are trying to get rid of guava in the long run.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308246#comment-15308246
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65233230
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65233230
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308233#comment-15308233
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231707
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308232#comment-15308232
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231642
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of 

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231642
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReduceHashTable.java
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeutils.SameTypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.RandomAccessInputView;
+import org.apache.flink.runtime.memory.AbstractPagedOutputView;
+import org.apache.flink.util.MathUtils;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * This hash table supports updating elements, and it also has 
processRecordWithReduce,
+ * which makes one reduce step with the given record.
+ *
+ * The memory is divided into three areas:
+ *  - Bucket area: they contain bucket heads:
+ *an 8 byte pointer to the first link of a linked list in the record 
area
+ *  - Record area: this contains the actual data in linked list elements. 
A linked list element starts
+ *with an 8 byte pointer to the next element, and then the record 
follows.
+ *  - Staging area: This is a small, temporary storage area for writing 
updated records. This is needed,
+ *because before serializing a record, there is no way to know in 
advance how large will it be.
+ *Therefore, we can't serialize directly into the record area when we 
are doing an update, because
+ *if it turns out to be larger then the old record, then it would 
override some other record
+ *that happens to be after the old one in memory. The solution is to 
serialize to the staging area first,
+ *and then copy it to the place of the original if it has the same 
size, otherwise allocate a new linked
+ *list element at the end of the record area, and mark the old one as 
abandoned. This creates "holes" in
+ *the record area, so compactions are eventually needed.
+ *
+ *  Compaction happens by deleting everything in the bucket area, and then 
reinserting all elements.
+ *  The reinsertion happens by forgetting the structure (the linked lists) 
of the record area, and reading it
+ *  sequentially, and inserting all non-abandoned records, starting from 
the beginning of the record area.
+ *  Note, that insertions never override a record that have not been read 
by the reinsertion sweep, because
+ *  both the insertions and readings happen sequentially in the record 
area, and the insertions obviously
+ *  never overtake the reading sweep.
+ *
+ *  Note: we have to abandon the old linked list element even when the 
updated record has a smaller size
+ *  than the original, because otherwise we wouldn't know where the next 
record starts during a reinsertion
+ *  sweep.
+ *
+ *  The number of buckets depends on how large are the records. The 
serializer might be able to tell us this,
+ *  so in this case, we will calculate the number of buckets upfront, and 
won't do resizes.
+ *  If the serializer doesn't know the size, then we start with a small 
number of buckets, and do resizes as more
+ *  elements are inserted than the number of buckets.
+ *
+ *  The number of memory segments given to the staging area is usually 
one, because it just needs to hold
+ *  one record.
+ *

[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231516
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -114,85 +118,133 @@ public void prepare() throws Exception {
 
MemoryManager memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(
-   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
this.memory = 
memManager.allocatePages(this.taskContext.getOwningNepheleTask(), 
numMemoryPages);
 
-   // instantiate a fix-length in-place sorter, if possible, 
otherwise the out-of-place sorter
-   if (this.comparator.supportsSerializationWithKeyNormalization() 
&&
-   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
-   {
-   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
-   } else {
-   this.sorter = new 
NormalizedKeySorter(this.serializer, this.comparator.duplicate(), memory);
-   }
-
ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
 
if (LOG.isDebugEnabled()) {
LOG.debug("ReduceCombineDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}
+
+   switch (strategy) {
+   case SORTED_PARTIAL_REDUCE:
+   // instantiate a fix-length in-place sorter, if 
possible, otherwise the out-of-place sorter
+   if 
(this.comparator.supportsSerializationWithKeyNormalization() &&
+   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) {
+   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
--- End diff --

336490ccbcfc3b1a2be93eddd9cb531532a19a9e


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231501
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager memManager = 
parent.getEnvironment().getMemoryManager();
+   final int numMemoryPages = 
memManager.computeNumberOfPages(config.getRelativeMemoryDriver());
+   memory = memManager.allocatePages(parent, numMemoryPages);

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308227#comment-15308227
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

> Can we keep the name of the flag as it is for now? All other drivers use 
a running flag as well. 
I would rather open a separate JIRA to fix the name in all drivers.

12e36ab93b7e7d94d497a6d718eba21ead813d7e

And here is the Jira: https://issues.apache.org/jira/browse/FLINK-3999


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308229#comment-15308229
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231501
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedReduceCombineDriver.java
 ---
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.hash.ReduceHashTable;
+import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
+import org.apache.flink.runtime.operators.sort.InMemorySorter;
+import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
+import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Chained version of ReduceCombineDriver.
+ */
+public class ChainedReduceCombineDriver extends ChainedDriver {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ChainedReduceCombineDriver.class);
+
+   /** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
+   private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
+
+
+   private AbstractInvokable parent;
+
+   private TypeSerializer serializer;
+
+   private TypeComparator comparator;
+
+   private ReduceFunction reducer;
+
+   private DriverStrategy strategy;
+
+   private InMemorySorter sorter;
+
+   private QuickSort sortAlgo = new QuickSort();
+
+   private ReduceHashTable table;
+
+   private List memory;
+
+   private volatile boolean canceled;
+
+   // 

+
+   @Override
+   public Function getStub() {
+   return reducer;
+   }
+
+   @Override
+   public String getTaskName() {
+   return taskName;
+   }
+
+   @Override
+   public void setup(AbstractInvokable parent) {
+   this.parent = parent;
+   canceled = false;
+
+   strategy = config.getDriverStrategy();
+
+   reducer = BatchTask.instantiateUserCode(config, 
userCodeClassLoader, ReduceFunction.class);
+   FunctionUtils.setFunctionRuntimeContext(reducer, 
getUdfRuntimeContext());
+   }
+
+   @Override
+   public void openTask() throws Exception {
+   // open the stub first
+   final Configuration stubConfig = config.getStubParameters();
+   BatchTask.openUserCode(reducer, stubConfig);
+
+   // instantiate the serializer / comparator
+   serializer = config.getInputSerializer(0, 
userCodeClassLoader).getSerializer();
+   comparator = config.getDriverComparator(0, 
userCodeClassLoader).createComparator();
+
+   MemoryManager 

[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308230#comment-15308230
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231516
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -114,85 +118,133 @@ public void prepare() throws Exception {
 
MemoryManager memManager = this.taskContext.getMemoryManager();
final int numMemoryPages = memManager.computeNumberOfPages(
-   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
+   
this.taskContext.getTaskConfig().getRelativeMemoryDriver());
this.memory = 
memManager.allocatePages(this.taskContext.getOwningNepheleTask(), 
numMemoryPages);
 
-   // instantiate a fix-length in-place sorter, if possible, 
otherwise the out-of-place sorter
-   if (this.comparator.supportsSerializationWithKeyNormalization() 
&&
-   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
-   {
-   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
-   } else {
-   this.sorter = new 
NormalizedKeySorter(this.serializer, this.comparator.duplicate(), memory);
-   }
-
ExecutionConfig executionConfig = 
taskContext.getExecutionConfig();
this.objectReuseEnabled = 
executionConfig.isObjectReuseEnabled();
 
if (LOG.isDebugEnabled()) {
LOG.debug("ReduceCombineDriver object reuse: " + 
(this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
}
+
+   switch (strategy) {
+   case SORTED_PARTIAL_REDUCE:
+   // instantiate a fix-length in-place sorter, if 
possible, otherwise the out-of-place sorter
+   if 
(this.comparator.supportsSerializationWithKeyNormalization() &&
+   this.serializer.getLength() > 0 && 
this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING) {
+   this.sorter = new 
FixedLengthRecordSorter(this.serializer, this.comparator, memory);
--- End diff --

336490ccbcfc3b1a2be93eddd9cb531532a19a9e


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65231386
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

> Can we keep the name of the flag as it is for now? All other drivers use 
a running flag as well. 
I would rather open a separate JIRA to fix the name in all drivers.

12e36ab93b7e7d94d497a6d718eba21ead813d7e

And here is the Jira: https://issues.apache.org/jira/browse/FLINK-3999


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-05-31 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-3999:
--

 Summary: Rename the `running` flag in the drivers to `canceled`
 Key: FLINK-3999
 URL: https://issues.apache.org/jira/browse/FLINK-3999
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Reporter: Gabor Gevay
Priority: Trivial


The name of the {{running}} flag in the drivers doesn't reflect its usage: when 
the operator just stops normally, then it is not running anymore, but the 
{{running}}  flag will still be true, since the {{running}} flag is only set 
when cancelling.

It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1979) Implement Loss Functions

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308199#comment-15308199
 ] 

ASF GitHub Bot commented on FLINK-1979:
---

Github user skavulya commented on the pull request:

https://github.com/apache/flink/pull/1985
  
@tillrohrmann @thvasilo I made the changes you recommended. Please let me 
know if they look ok.


> Implement Loss Functions
> 
>
> Key: FLINK-1979
> URL: https://issues.apache.org/jira/browse/FLINK-1979
> Project: Flink
>  Issue Type: Improvement
>  Components: Machine Learning Library
>Reporter: Johannes Günther
>Assignee: Johannes Günther
>Priority: Minor
>  Labels: ML
>
> For convex optimization problems, optimizer methods like SGD rely on a 
> pluggable implementation of a loss function and its first derivative.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1979] Add logistic loss, hinge loss and regulariz...

2016-05-31 Thread skavulya
Github user skavulya commented on the pull request:

https://github.com/apache/flink/pull/1985
  
@tillrohrmann @thvasilo I made the changes you recommended. Please let me 
know if they look ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308205#comment-15308205
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65229499
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

Oops, it's `volatile` in the other drivers as well, sorry! So then it can 
be just volatile here as well, right?


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65229499
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

Oops, it's `volatile` in the other drivers as well, sorry! So then it can 
be just volatile here as well, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308191#comment-15308191
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65228213
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

e7851693dd261efb6eb1fcde12b04fcf384bd0d4


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308193#comment-15308193
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65228287
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

b01fd497fe5e2cf85b71fa8a7b4ec60b5e47d1c5


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65228287
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

b01fd497fe5e2cf85b71fa8a7b4ec60b5e47d1c5


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65228213
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/operators/UnsortedGrouping.java
 ---
@@ -135,10 +136,37 @@ public UnsortedGrouping(DataSet set, Keys keys) 
{
 * @see DataSet
 */
public ReduceOperator reduce(ReduceFunction reducer) {
+   return reduce(Utils.getCallLocationName(), reducer, 
CombineHint.OPTIMIZER_CHOOSES);
+   }
+
+   /**
+* Applies a Reduce transformation on a grouped {@link DataSet}.
+* For each group, the transformation consecutively calls a {@link 
org.apache.flink.api.common.functions.RichReduceFunction}
+*   until only a single element for each group remains.
+* A ReduceFunction combines two elements into one new element of the 
same type.
+*
+* @param reducer The ReduceFunction that is applied on each group of 
the DataSet.
+* @param strategy The strategy that should be used to execute the 
combine phase of the reduce.
+* If {@code null} is given, then the optimizer will 
pick the strategy.
+* @return A ReduceOperator that represents the reduced DataSet.
+*
+* @see org.apache.flink.api.common.functions.RichReduceFunction
+* @see ReduceOperator
+* @see DataSet
+*/
+   public ReduceOperator reduce(ReduceFunction reducer, CombineHint 
strategy) {
--- End diff --

e7851693dd261efb6eb1fcde12b04fcf384bd0d4


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3806) Revert use of DataSet.count() in Gelly

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308181#comment-15308181
 ] 

ASF GitHub Bot commented on FLINK-3806:
---

Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/2036
  
All comments should now be addressed.


> Revert use of DataSet.count() in Gelly
> --
>
> Key: FLINK-3806
> URL: https://issues.apache.org/jira/browse/FLINK-3806
> Project: Flink
>  Issue Type: Improvement
>  Components: Gelly
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Critical
> Fix For: 1.1.0
>
>
> FLINK-1632 replaced {{GraphUtils.count}} with {{DataSetUtils.count}}. The 
> former returns a {{DataSet}} while the latter executes a job to return a Java 
> value.
> {{DataSetUtils.count}} is called from {{Graph.numberOfVertices}} and 
> {{Graph.numberOfEdges}} which are called from {{GatherSumApplyIteration}} and 
> {{ScatterGatherIteration}} as well as the {{PageRank}} algorithms when the 
> user does not pass the number of vertices as a parameter.
> As noted in FLINK-1632, this does make the code simpler but if my 
> understanding is correct will materialize the Graph twice. The Graph will 
> need to be reread from input, regenerated, or recomputed by preceding 
> algorithms.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3806] [gelly] Revert use of DataSet.count()

2016-05-31 Thread greghogan
Github user greghogan commented on the pull request:

https://github.com/apache/flink/pull/2036
  
All comments should now be addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [hotfix] Fix JSONDeserializationSchema for Kafka; Parame...

2016-05-31 Thread rmetzger
GitHub user rmetzger opened a pull request:

https://github.com/apache/flink/pull/2057

[hotfix] Fix JSONDeserializationSchema for Kafka; ParameterTool usability

The JSONDeserializationSchema was implementing the 
KeyedDeserializationSchema. However, it was not using the key or any other 
additional metadata.
Therefore, I changed the base class.

Also, the `ParameterTool`'s exception for reading a `.properties` file has 
been improved.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rmetzger/flink mini-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2057.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2057


commit 6f334b81fd17d80a46d62797e918a098406cf6a0
Author: Robert Metzger 
Date:   2016-05-31T16:50:35Z

[hotfix] Fix JSONDeserializationSchema for Kafka; ParameterTool usability




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65219886
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

> Would it be OK if I added optional arguments instead of the overloads 
like this?

Oops, this doesn't work. I'm getting
```
Error:(45, 7) in class GroupedDataSet, multiple overloaded alternatives of 
reduce define default arguments
```

An alternative solution would be to instead of just wrapping in a Scala 
`DataSet`, create a Scala class that wraps `ReduceOperator`, has the same 
`setCombineHint` method, and inherits from the Scala `DataSet`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308080#comment-15308080
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65219886
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/GroupedDataSet.scala ---
@@ -282,27 +283,57 @@ class GroupedDataSet[T: ClassTag](
   }
 
   /**
-   * Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
-   * using an associative reduce function.
-   */
+* Creates a new [[DataSet]] by merging the elements of each group 
(elements with the same key)
+* using an associative reduce function.
+*/
   def reduce(fun: (T, T) => T): DataSet[T] = {
+reduce(getCallLocationName(), fun, CombineHint.OPTIMIZER_CHOOSES)
+  }
+
+  /**
+   * Special [[reduce]] operation for explicitly telling the system what 
strategy to use for the
+   * combine phase.
+   * If null is given as the strategy, then the optimizer will pick the 
strategy.
+   */
+  def reduce(fun: (T, T) => T, strategy: CombineHint): DataSet[T] = {
--- End diff --

> Would it be OK if I added optional arguments instead of the overloads 
like this?

Oops, this doesn't work. I'm getting
```
Error:(45, 7) in class GroupedDataSet, multiple overloaded alternatives of 
reduce define default arguments
```

An alternative solution would be to instead of just wrapping in a Scala 
`DataSet`, create a Scala class that wraps `ReduceOperator`, has the same 
`setCombineHint` method, and inherits from the Scala `DataSet`.


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-3998) Access to gauges and counters in StatsDReporter#report() is not properly synchronized

2016-05-31 Thread Ted Yu (JIRA)
Ted Yu created FLINK-3998:
-

 Summary: Access to gauges and counters in StatsDReporter#report() 
is not properly synchronized
 Key: FLINK-3998
 URL: https://issues.apache.org/jira/browse/FLINK-3998
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


{code}
for (Map.Entry entry : gauges.entrySet()) {
  reportGauge(entry.getValue(), entry.getKey());
}

for (Map.Entry entry : counters.entrySet()) {
  reportCounter(entry.getValue(), entry.getKey());
{code}
Access to gauges and counters should be protected by lock on 
AbstractReporter.this



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3477) Add hash-based combine strategy for ReduceFunction

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308038#comment-15308038
 ] 

ASF GitHub Bot commented on FLINK-3477:
---

Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65216454
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

I have now looked more into this `volatile` stuff, and the problem is that 
further investigation would be needed to ensure that the variable being 
`volatile` doesn't have a bad performance effect (see [1]). So maybe the best 
option for now is to make it non-volatile (as it is in all the other drivers) 
and just hope that it doesn't blow up. Should I open a Jira for doing further 
investigation and then maybe changing all the drivers?

[1] http://brooker.co.za/blog/2012/09/10/volatile.html


> Add hash-based combine strategy for ReduceFunction
> --
>
> Key: FLINK-3477
> URL: https://issues.apache.org/jira/browse/FLINK-3477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Local Runtime
>Reporter: Fabian Hueske
>Assignee: Gabor Gevay
>
> This issue is about adding a hash-based combine strategy for ReduceFunctions.
> The interface of the {{reduce()}} method is as follows:
> {code}
> public T reduce(T v1, T v2)
> {code}
> Input type and output type are identical and the function returns only a 
> single value. A Reduce function is incrementally applied to compute a final 
> aggregated value. This allows to hold the preaggregated value in a hash-table 
> and update it with each function call. 
> The hash-based strategy requires special implementation of an in-memory hash 
> table. The hash table should support in place updates of elements (if the 
> updated value has the same size as the new value) but also appending updates 
> with invalidation of the old value (if the binary length of the new value 
> differs). The hash table needs to be able to evict and emit all elements if 
> it runs out-of-memory.
> We should also add {{HASH}} and {{SORT}} compiler hints to 
> {{DataSet.reduce()}} and {{Grouping.reduce()}} to allow users to pick the 
> execution strategy.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308039#comment-15308039
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/2020
  
The changes look good, we just have to figure out what to do about the 
methods on `StreamExecutionEnvironment`. One thing I'd like to get rid of is 
`CheckpointableInputFormat.getCurrentState` returns a Tuple that contains the 
split. The split is never used internally, it's only stored to return it from 
this method but the read operator itself also stores the split so I think it is 
redundant.


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/2020
  
The changes look good, we just have to figure out what to do about the 
methods on `StreamExecutionEnvironment`. One thing I'd like to get rid of is 
`CheckpointableInputFormat.getCurrentState` returns a Tuple that contains the 
split. The split is never used internally, it's only stored to return it from 
this method but the read operator itself also stores the split so I think it is 
redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-3477] [runtime] Add hash-based combine strategy f...

2016-05-31 Thread ggevay
Github user ggevay commented on a diff in the pull request:

https://github.com/apache/flink/pull/1517#discussion_r65216454
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceCombineDriver.java
 ---
@@ -42,34 +44,38 @@
  * Combine operator for Reduce functions, standalone (not chained).
  * Sorts and groups and reduces data, but never spills the sort. May 
produce multiple
  * partially aggregated groups.
- * 
+ *
  * @param  The data type consumed and produced by the combiner.
  */
 public class ReduceCombineDriver implements Driver {
-   
+
private static final Logger LOG = 
LoggerFactory.getLogger(ReduceCombineDriver.class);
 
/** Fix length records with a length below this threshold will be 
in-place sorted, if possible. */
private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
-   
-   
+
+
private TaskContext taskContext;
 
private TypeSerializer serializer;
 
private TypeComparator comparator;
-   
+
private ReduceFunction reducer;
-   
+
private Collector output;
-   
+
+   private DriverStrategy strategy;
+
private InMemorySorter sorter;
-   
+
private QuickSort sortAlgo = new QuickSort();
 
+   private ReduceHashTable table;
+
private List memory;
 
-   private boolean running;
+   private volatile boolean canceled;
--- End diff --

I have now looked more into this `volatile` stuff, and the problem is that 
further investigation would be needed to ensure that the variable being 
`volatile` doesn't have a bad performance effect (see [1]). So maybe the best 
option for now is to make it non-volatile (as it is in all the other drivers) 
and just hope that it doesn't blow up. Should I open a Jira for doing further 
investigation and then maybe changing all the drivers?

[1] http://brooker.co.za/blog/2012/09/10/volatile.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65216093
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 ---
@@ -104,8 +106,21 @@ public void runCheckpointedProgram() {
postSubmit();
}
catch (Exception e) {
+   Throwable th = e;
--- End diff --

What was the reason for this again? 😅 I don't remember.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308035#comment-15308035
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65216093
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 ---
@@ -104,8 +106,21 @@ public void runCheckpointedProgram() {
postSubmit();
}
catch (Exception e) {
+   Throwable th = e;
--- End diff --

What was the reason for this again?  I don't remember.


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3993) [py] Add generateSequence() support to Python API

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3993?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308013#comment-15308013
 ] 

ASF GitHub Bot commented on FLINK-3993:
---

Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/2055
  
please add them as a separate commit, they will be squashed when merging.

Good point regarding frm.


> [py] Add generateSequence() support to Python API
> -
>
> Key: FLINK-3993
> URL: https://issues.apache.org/jira/browse/FLINK-3993
> Project: Flink
>  Issue Type: Improvement
>  Components: Python API
>Affects Versions: 1.0.3
>Reporter: Omar Alvarez
>Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Right now, I believe that there is only from_elements() support in order to 
> create a sequence of numbers. It is interesting to be able to create a list 
> of numbers from the Python API also, apart from the Java API. It would not be 
> complicated, since we already have generateSequence(). I am already working 
> on this, and will create a pull request shortly in Github.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3993] [py] Add generateSequence() support to Pyth...

2016-05-31 Thread zentol
Github user zentol commented on the pull request:

https://github.com/apache/flink/pull/2055
  
please add them as a separate commit, they will be squashed when merging.

Good point regarding frm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308005#comment-15308005
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65212938
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which takes a {@link 
FileInputFormat} and is responsible for
+ * i) monitoring a user-provided path, ii) deciding which files should be 
further read and processed,
+ * iii) creating the {@link FileInputSplit FileInputSplits} corresponding 
to those files, and iv) assigning
+ * them to downstream tasks for further reading and processing. Which 
splits will be further processed
+ * depends on the user-provided {@link ProcessingMode} and the {@link 
FilePathFilter}.
+ * The splits of the files to be read are then forwarded to the downstream
+ * {@link ContinuousFileReaderOperator} which can have parallelism greater 
than one.
+ */
+@Internal
+public class ContinuousFileMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
+
+   /**
+* The minimum interval allowed between consecutive path scans. This is 
applicable if the
+* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+*/
+   public static final long MIN_MONITORING_INTERVAL = 100l;
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum ProcessingMode {
--- End diff --

Also as `PublicEvolving`.


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15308004#comment-15308004
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65212915
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which takes a {@link 
FileInputFormat} and is responsible for
+ * i) monitoring a user-provided path, ii) deciding which files should be 
further read and processed,
+ * iii) creating the {@link FileInputSplit FileInputSplits} corresponding 
to those files, and iv) assigning
+ * them to downstream tasks for further reading and processing. Which 
splits will be further processed
+ * depends on the user-provided {@link ProcessingMode} and the {@link 
FilePathFilter}.
+ * The splits of the files to be read are then forwarded to the downstream
+ * {@link ContinuousFileReaderOperator} which can have parallelism greater 
than one.
+ */
+@Internal
+public class ContinuousFileMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
+
+   /**
+* The minimum interval allowed between consecutive path scans. This is 
applicable if the
+* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+*/
+   public static final long MIN_MONITORING_INTERVAL = 100l;
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum ProcessingMode {
--- End diff --

We should probably move this outside of this class and probably name it 
`FileProcessingMode`. Just in case we want to change the actual file source 
again.  


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65212938
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which takes a {@link 
FileInputFormat} and is responsible for
+ * i) monitoring a user-provided path, ii) deciding which files should be 
further read and processed,
+ * iii) creating the {@link FileInputSplit FileInputSplits} corresponding 
to those files, and iv) assigning
+ * them to downstream tasks for further reading and processing. Which 
splits will be further processed
+ * depends on the user-provided {@link ProcessingMode} and the {@link 
FilePathFilter}.
+ * The splits of the files to be read are then forwarded to the downstream
+ * {@link ContinuousFileReaderOperator} which can have parallelism greater 
than one.
+ */
+@Internal
+public class ContinuousFileMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
+
+   /**
+* The minimum interval allowed between consecutive path scans. This is 
applicable if the
+* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+*/
+   public static final long MIN_MONITORING_INTERVAL = 100l;
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum ProcessingMode {
--- End diff --

Also as `PublicEvolving`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65212915
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 ---
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.io.FileInputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is the single (non-parallel) task which takes a {@link 
FileInputFormat} and is responsible for
+ * i) monitoring a user-provided path, ii) deciding which files should be 
further read and processed,
+ * iii) creating the {@link FileInputSplit FileInputSplits} corresponding 
to those files, and iv) assigning
+ * them to downstream tasks for further reading and processing. Which 
splits will be further processed
+ * depends on the user-provided {@link ProcessingMode} and the {@link 
FilePathFilter}.
+ * The splits of the files to be read are then forwarded to the downstream
+ * {@link ContinuousFileReaderOperator} which can have parallelism greater 
than one.
+ */
+@Internal
+public class ContinuousFileMonitoringFunction
+   extends RichSourceFunction implements 
Checkpointed>, Tuple2, Long>> {
+
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
+
+   /**
+* The minimum interval allowed between consecutive path scans. This is 
applicable if the
+* {@code watchType} is set to {@code PROCESS_CONTINUOUSLY}.
+*/
+   public static final long MIN_MONITORING_INTERVAL = 100l;
+
+   /**
+* Specifies when computation will be triggered.
+*/
+   public enum ProcessingMode {
--- End diff --

We should probably move this outside of this class and probably name it 
`FileProcessingMode`. Just in case we want to change the actual file source 
again. 😉 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307995#comment-15307995
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65212384
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
@@ -483,28 +500,31 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 * @return The data stream that represents the data read from the given 
file
 */
   def readFile[T: TypeInformation](
-  inputFormat: FileInputFormat[T],
-  filePath: String,
-  watchType: WatchType,
-  interval: Long): DataStream[T] =
+inputFormat: FileInputFormat[T],
--- End diff --

IntelliJ formatting is off here, same below.


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65212384
  
--- Diff: 
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
 ---
@@ -483,28 +500,31 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 * @return The data stream that represents the data read from the given 
file
 */
   def readFile[T: TypeInformation](
-  inputFormat: FileInputFormat[T],
-  filePath: String,
-  watchType: WatchType,
-  interval: Long): DataStream[T] =
+inputFormat: FileInputFormat[T],
--- End diff --

IntelliJ formatting is off here, same below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-3886) Give a better error when the application Main class is not public.

2016-05-31 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3886.
---

> Give a better error when the application Main class is not public.
> --
>
> Key: FLINK-3886
> URL: https://issues.apache.org/jira/browse/FLINK-3886
> Project: Flink
>  Issue Type: Improvement
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Fix For: 1.1.0
>
> Attachments: FLINK-3886-20160509.patch
>
>
> I wrote a Flink application and made the simple mistake of making the Main 
> class 'package private' by simply writing it as (note: I simply forgot the 
> 'public' keyword)
> {code}
> class Main {
>...
> }
> {code}
> The error you get is:
> {code}
> Caused by: java.lang.IllegalAccessException: Class 
> org.apache.flink.client.program.PackagedProgram can not access a member of 
> class com.bol.experiment.flink.Main with modifiers "public static"
>   at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
>   at 
> java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
>   at 
> java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
>   at java.lang.reflect.Method.invoke(Method.java:490)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> {code}
> This took me 30 minutes to figure out what I did wrong.
> I think the error message should be more explanatory to the developer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext

2016-05-31 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3978.
---

> Add hasBroadcastVariable method to RuntimeContext
> -
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3886) Give a better error when the application Main class is not public.

2016-05-31 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-3886.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed via 6db9e6ae5c2f4968687a3fcae0520e05442bd9ab

> Give a better error when the application Main class is not public.
> --
>
> Key: FLINK-3886
> URL: https://issues.apache.org/jira/browse/FLINK-3886
> Project: Flink
>  Issue Type: Improvement
>Reporter: Niels Basjes
>Assignee: Niels Basjes
> Fix For: 1.1.0
>
> Attachments: FLINK-3886-20160509.patch
>
>
> I wrote a Flink application and made the simple mistake of making the Main 
> class 'package private' by simply writing it as (note: I simply forgot the 
> 'public' keyword)
> {code}
> class Main {
>...
> }
> {code}
> The error you get is:
> {code}
> Caused by: java.lang.IllegalAccessException: Class 
> org.apache.flink.client.program.PackagedProgram can not access a member of 
> class com.bol.experiment.flink.Main with modifiers "public static"
>   at sun.reflect.Reflection.ensureMemberAccess(Reflection.java:102)
>   at 
> java.lang.reflect.AccessibleObject.slowCheckMemberAccess(AccessibleObject.java:296)
>   at 
> java.lang.reflect.AccessibleObject.checkAccess(AccessibleObject.java:288)
>   at java.lang.reflect.Method.invoke(Method.java:490)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> {code}
> This took me 30 minutes to figure out what I did wrong.
> I think the error message should be more explanatory to the developer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart

2016-05-31 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-3979.
-
   Resolution: Fixed
Fix Version/s: 1.1.0

Fixed via ad52a5f0cebe6e5d593be0ae40656b28470af106

Thank you for the contribution!

> [documentation]add missed import classes in run_example_quickstart
> --
>
> Key: FLINK-3979
> URL: https://issues.apache.org/jira/browse/FLINK-3979
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Jia Zhai
> Fix For: 1.1.0
>
>
> The classes that need to be imported  for this part of code
> {code}
> result
> .map(new MapFunction, String>() {
> @Override
> public String map(Tuple2 tuple) {
> return tuple.toString();
> }
> })
> .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new 
> SimpleStringSchema()));
> {code}
> is 
> {code}
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.functions.MapFunction;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-3979) [documentation]add missed import classes in run_example_quickstart

2016-05-31 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-3979.
---

> [documentation]add missed import classes in run_example_quickstart
> --
>
> Key: FLINK-3979
> URL: https://issues.apache.org/jira/browse/FLINK-3979
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Jia Zhai
> Fix For: 1.1.0
>
>
> The classes that need to be imported  for this part of code
> {code}
> result
> .map(new MapFunction, String>() {
> @Override
> public String map(Tuple2 tuple) {
> return tuple.toString();
> }
> })
> .addSink(new FlinkKafkaProducer08<>("localhost:9092", "wiki-result", new 
> SimpleStringSchema()));
> {code}
> is 
> {code}
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
> import org.apache.flink.api.common.functions.MapFunction;
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-3978) Add hasBroadcastVariable method to RuntimeContext

2016-05-31 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3978?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-3978.
-
Resolution: Fixed

Fixed via 5b0287971fa2beda360105d96e7bfbc7a110fae7

> Add hasBroadcastVariable method to RuntimeContext
> -
>
> Key: FLINK-3978
> URL: https://issues.apache.org/jira/browse/FLINK-3978
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.1.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Minor
> Fix For: 1.1.0
>
>
> The javadocs for {{RuntimeContext}} state that {{getAccumulator}} "throws an 
> exception if the accumulator does not exist or if the accumulator exists, but 
> with different type", although {{AbstractRuntimeUDFContext}} does not throw 
> an exception but will return null.
> The javadocs for {{getBroadcastVariable}} do not mention throwing an 
> exception. Currently the only way to handle a broadcast variable that that 
> may or may not exist is to catch and ignore the exception. Adding a  
> {{containsBroadcastVariable}} method to {{RuntimeContext}} would make this 
> explicit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307951#comment-15307951
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65209503
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -897,21 +900,21 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
 *  The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
 * @return The DataStream containing the given directory.
 */
-   public DataStream readFileStream(String filePath, long 
intervalMillis, WatchType watchType) {
+   public DataStream readTextFile(String filePath, ProcessingMode 
watchType, long intervalMillis) {
Preconditions.checkNotNull(filePath, "The file path may not be 
null.");
 
TextInputFormat format = new TextInputFormat(new 
Path(filePath));
-   return readFile(format, filePath, watchType, intervalMillis);
+   return readFile(format, filePath, watchType, intervalMillis, 
FilePathFilter.DefaultFilter.getInstance());
}
 
/**
 * Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
 * line. The file will be read with the system's default character set.
 *
 * 
-*  NOTES ON CHECKPOINTING:  The source (which executes the 
{@link FileSplitMonitoringFunction})
+*  NOTES ON CHECKPOINTING:  The source (which executes the 
{@link ContinuousFileMonitoringFunction})
--- End diff --

Same as above, and in the javadoc below.


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65209503
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -897,21 +900,21 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
 *  The mode in which the source should operate, i.e. 
monitor path and react to new data, or process once and exit
 * @return The DataStream containing the given directory.
 */
-   public DataStream readFileStream(String filePath, long 
intervalMillis, WatchType watchType) {
+   public DataStream readTextFile(String filePath, ProcessingMode 
watchType, long intervalMillis) {
Preconditions.checkNotNull(filePath, "The file path may not be 
null.");
 
TextInputFormat format = new TextInputFormat(new 
Path(filePath));
-   return readFile(format, filePath, watchType, intervalMillis);
+   return readFile(format, filePath, watchType, intervalMillis, 
FilePathFilter.DefaultFilter.getInstance());
}
 
/**
 * Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
 * line. The file will be read with the system's default character set.
 *
 * 
-*  NOTES ON CHECKPOINTING:  The source (which executes the 
{@link FileSplitMonitoringFunction})
+*  NOTES ON CHECKPOINTING:  The source (which executes the 
{@link ContinuousFileMonitoringFunction})
--- End diff --

Same as above, and in the javadoc below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307948#comment-15307948
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65209426
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -877,15 +880,15 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
 
/**
 * Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
-* line. Depending on the provided {@link WatchType}, the source may 
periodically monitor (every {@code interval} ms)
-* the path for new data ({@link WatchType#REPROCESS_WITH_APPENDED}), 
or process once the data currently in the path and
-* exit ({@link WatchType#PROCESS_ONCE}).
+* line. Depending on the provided {@link ProcessingMode}, the source 
may periodically monitor (every {@code interval} ms)
+* the path for new data ({@link ProcessingMode#PROCESS_CONTINUOUSLY}), 
or process once the data currently in the path and
+* exit ({@link ProcessingMode#PROCESS_ONCE}).
 *
 * 
-*  NOTES ON CHECKPOINTING:  If the {@code watchType} is set to 
{@link WatchType#PROCESS_ONCE}, the source
-* (which executes the {@link FileSplitMonitoringFunction}) monitors 
the path once, creates the
+*  NOTES ON CHECKPOINTING:  If the {@code watchType} is set to 
{@link ProcessingMode#PROCESS_ONCE}, the source
+* (which executes the {@link ContinuousFileMonitoringFunction}) 
monitors the path once, creates the
--- End diff --

I think we can remove the "(which executes the {@link 
ContinuousFileMonitoringFunction})" part. The source does not execute this, it 
is itself the source. 


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65209426
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -877,15 +880,15 @@ public TimeCharacteristic 
getStreamTimeCharacteristic() {
 
/**
 * Reads the given file line-by-line and creates a data stream that 
contains a string with the contents of each such
-* line. Depending on the provided {@link WatchType}, the source may 
periodically monitor (every {@code interval} ms)
-* the path for new data ({@link WatchType#REPROCESS_WITH_APPENDED}), 
or process once the data currently in the path and
-* exit ({@link WatchType#PROCESS_ONCE}).
+* line. Depending on the provided {@link ProcessingMode}, the source 
may periodically monitor (every {@code interval} ms)
+* the path for new data ({@link ProcessingMode#PROCESS_CONTINUOUSLY}), 
or process once the data currently in the path and
+* exit ({@link ProcessingMode#PROCESS_ONCE}).
 *
 * 
-*  NOTES ON CHECKPOINTING:  If the {@code watchType} is set to 
{@link WatchType#PROCESS_ONCE}, the source
-* (which executes the {@link FileSplitMonitoringFunction}) monitors 
the path once, creates the
+*  NOTES ON CHECKPOINTING:  If the {@code watchType} is set to 
{@link ProcessingMode#PROCESS_ONCE}, the source
+* (which executes the {@link ContinuousFileMonitoringFunction}) 
monitors the path once, creates the
--- End diff --

I think we can remove the "(which executes the {@link 
ContinuousFileMonitoringFunction})" part. The source does not execute this, it 
is itself the source. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2314] Make Streaming File Sources Persistent

2016-05-31 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65207037
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -82,6 +82,8 @@
 */
private static int MAX_SAMPLE_LEN;
 
+   private boolean restoring = false;
--- End diff --

That's probable left from a previous change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2314) Make Streaming File Sources Persistent

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307930#comment-15307930
 ] 

ASF GitHub Bot commented on FLINK-2314:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/2020#discussion_r65207037
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 ---
@@ -82,6 +82,8 @@
 */
private static int MAX_SAMPLE_LEN;
 
+   private boolean restoring = false;
--- End diff --

That's probable left from a previous change


> Make Streaming File Sources Persistent
> --
>
> Key: FLINK-2314
> URL: https://issues.apache.org/jira/browse/FLINK-2314
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 0.9
>Reporter: Stephan Ewen
>Assignee: Kostas Kloudas
>
> Streaming File sources should participate in the checkpointing. They should 
> track the bytes they read from the file and checkpoint it.
> One can look at the sequence generating source function for an example of a 
> checkpointed source.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3994) Instable KNNITSuite

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307904#comment-15307904
 ] 

ASF GitHub Bot commented on FLINK-3994:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2056
  
Thanks for clarifying @StephanEwen! I'll merge this after Travis succeed.


> Instable KNNITSuite
> ---
>
> Key: FLINK-3994
> URL: https://issues.apache.org/jira/browse/FLINK-3994
> Project: Flink
>  Issue Type: Bug
>  Components: Machine Learning Library, Tests
>Affects Versions: 1.1.0
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.1.0
>
>
> KNNITSuite fails in Travis-CI with following error:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   ...
>   Cause: java.io.IOException: Insufficient number of network buffers: 
> required 32, but only 4 available. The total number of network buffers is 
> currently set to 2048. You can increase this number by setting the 
> configuration key 'taskmanager.network.numberOfBuffers'.
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>   at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497)
>   at java.lang.Thread.run(Thread.java:745)
>   ...
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-3994] [ml, tests] Fix flaky KNN integration tests

2016-05-31 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/2056
  
Thanks for clarifying @StephanEwen! I'll merge this after Travis succeed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3994) Instable KNNITSuite

2016-05-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15307899#comment-15307899
 ] 

ASF GitHub Bot commented on FLINK-3994:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/2056
  
There is actually a problem with the way the Scala Tests are written:

The code in the class that is outside the "it should" clauses is executed 
before the "before" function. That is why the tests go against a 
LocalEnvironment, rather than the test context environment.

So Chiwan's patch will actually fix it, only for different reasons than 
initially expected.


> Instable KNNITSuite
> ---
>
> Key: FLINK-3994
> URL: https://issues.apache.org/jira/browse/FLINK-3994
> Project: Flink
>  Issue Type: Bug
>  Components: Machine Learning Library, Tests
>Affects Versions: 1.1.0
>Reporter: Chiwan Park
>Assignee: Chiwan Park
>Priority: Critical
>  Labels: test-stability
> Fix For: 1.1.0
>
>
> KNNITSuite fails in Travis-CI with following error:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:806)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:752)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>   ...
>   Cause: java.io.IOException: Insufficient number of network buffers: 
> required 32, but only 4 available. The total number of network buffers is 
> currently set to 2048. You can increase this number by setting the 
> configuration key 'taskmanager.network.numberOfBuffers'.
>   at 
> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:196)
>   at 
> org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:327)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:497)
>   at java.lang.Thread.run(Thread.java:745)
>   ...
> {code}
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064237/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064236/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134064235/log.txt
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/134052961/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   >