[jira] [Commented] (FLINK-2013) Create generalized linear model framework

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189249#comment-16189249
 ] 

ASF GitHub Bot commented on FLINK-2013:
---

Github user mtunique commented on a diff in the pull request:

https://github.com/apache/flink/pull/3756#discussion_r142308699
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
 ---
@@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite
 
 val parameters = ParameterMap()
 
-parameters.add(MultipleLinearRegression.Stepsize, 2.0)
-parameters.add(MultipleLinearRegression.Iterations, 10)
-parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+parameters.add(WithIterativeSolver.Stepsize, 2.0)
+parameters.add(WithIterativeSolver.Iterations, 10)
+parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001)
 
 mlr.fit(sparseInputDS, parameters)
 
 val weightList = mlr.weightsOption.get.collect()
 
 val WeightVector(weights, intercept) = weightList.head
+println(weightList)
--- End diff --

Ok I will delete  it.


> Create generalized linear model framework
> -
>
> Key: FLINK-2013
> URL: https://issues.apache.org/jira/browse/FLINK-2013
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Theodore Vasiloudis
>  Labels: ML
>
> [Generalized linear 
> models|http://en.wikipedia.org/wiki/Generalized_linear_model] (GLMs) provide 
> an abstraction for many learning models that can be used for regression and 
> classification tasks.
> Some example GLMs are linear regression, logistic regression, LASSO and ridge 
> regression.
> The goal for this JIRA is to provide interfaces for the set of common 
> properties and functions these models share. 
> In order to achieve that, a design pattern similar to the one that 
> [sklearn|http://scikit-learn.org/stable/modules/linear_model.html] and 
> [MLlib|http://spark.apache.org/docs/1.3.0/mllib-linear-methods.html] employ 
> will be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #3756: [FLINK-2013] Create generalized linear model frame...

2017-10-02 Thread mtunique
Github user mtunique commented on a diff in the pull request:

https://github.com/apache/flink/pull/3756#discussion_r142308699
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
 ---
@@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite
 
 val parameters = ParameterMap()
 
-parameters.add(MultipleLinearRegression.Stepsize, 2.0)
-parameters.add(MultipleLinearRegression.Iterations, 10)
-parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+parameters.add(WithIterativeSolver.Stepsize, 2.0)
+parameters.add(WithIterativeSolver.Iterations, 10)
+parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001)
 
 mlr.fit(sparseInputDS, parameters)
 
 val weightList = mlr.weightsOption.get.collect()
 
 val WeightVector(weights, intercept) = weightList.head
+println(weightList)
--- End diff --

Ok I will delete  it.


---


[jira] [Commented] (FLINK-2013) Create generalized linear model framework

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189241#comment-16189241
 ] 

ASF GitHub Bot commented on FLINK-2013:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3756#discussion_r142307430
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
 ---
@@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite
 
 val parameters = ParameterMap()
 
-parameters.add(MultipleLinearRegression.Stepsize, 2.0)
-parameters.add(MultipleLinearRegression.Iterations, 10)
-parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+parameters.add(WithIterativeSolver.Stepsize, 2.0)
+parameters.add(WithIterativeSolver.Iterations, 10)
+parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001)
 
 mlr.fit(sparseInputDS, parameters)
 
 val weightList = mlr.weightsOption.get.collect()
 
 val WeightVector(weights, intercept) = weightList.head
+println(weightList)
--- End diff --

remove this?


> Create generalized linear model framework
> -
>
> Key: FLINK-2013
> URL: https://issues.apache.org/jira/browse/FLINK-2013
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Theodore Vasiloudis
>  Labels: ML
>
> [Generalized linear 
> models|http://en.wikipedia.org/wiki/Generalized_linear_model] (GLMs) provide 
> an abstraction for many learning models that can be used for regression and 
> classification tasks.
> Some example GLMs are linear regression, logistic regression, LASSO and ridge 
> regression.
> The goal for this JIRA is to provide interfaces for the set of common 
> properties and functions these models share. 
> In order to achieve that, a design pattern similar to the one that 
> [sklearn|http://scikit-learn.org/stable/modules/linear_model.html] and 
> [MLlib|http://spark.apache.org/docs/1.3.0/mllib-linear-methods.html] employ 
> will be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-2013) Create generalized linear model framework

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189242#comment-16189242
 ] 

ASF GitHub Bot commented on FLINK-2013:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3756
  
+1, LGTM


> Create generalized linear model framework
> -
>
> Key: FLINK-2013
> URL: https://issues.apache.org/jira/browse/FLINK-2013
> Project: Flink
>  Issue Type: New Feature
>  Components: Machine Learning Library
>Reporter: Theodore Vasiloudis
>Assignee: Theodore Vasiloudis
>  Labels: ML
>
> [Generalized linear 
> models|http://en.wikipedia.org/wiki/Generalized_linear_model] (GLMs) provide 
> an abstraction for many learning models that can be used for regression and 
> classification tasks.
> Some example GLMs are linear regression, logistic regression, LASSO and ridge 
> regression.
> The goal for this JIRA is to provide interfaces for the set of common 
> properties and functions these models share. 
> In order to achieve that, a design pattern similar to the one that 
> [sklearn|http://scikit-learn.org/stable/modules/linear_model.html] and 
> [MLlib|http://spark.apache.org/docs/1.3.0/mllib-linear-methods.html] employ 
> will be used.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #3756: [FLINK-2013] Create generalized linear model framework

2017-10-02 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/3756
  
+1, LGTM


---


[GitHub] flink pull request #3756: [FLINK-2013] Create generalized linear model frame...

2017-10-02 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/3756#discussion_r142307430
  
--- Diff: 
flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
 ---
@@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite
 
 val parameters = ParameterMap()
 
-parameters.add(MultipleLinearRegression.Stepsize, 2.0)
-parameters.add(MultipleLinearRegression.Iterations, 10)
-parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+parameters.add(WithIterativeSolver.Stepsize, 2.0)
+parameters.add(WithIterativeSolver.Iterations, 10)
+parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001)
 
 mlr.fit(sparseInputDS, parameters)
 
 val weightList = mlr.weightsOption.get.collect()
 
 val WeightVector(weights, intercept) = weightList.head
+println(weightList)
--- End diff --

remove this?


---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189176#comment-16189176
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142228578
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
+   try {
+   return new RestClusterClient(config);
+   } catch (Exception e) {
+   throw new RuntimeException("Couldn't retrieve 
standalone cluster", e);
--- End diff --

How about adding a `Flip6ClusterException` to wrap this exception?


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread bowenli86
Github user bowenli86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142228578
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
+   try {
+   return new RestClusterClient(config);
+   } catch (Exception e) {
+   throw new RuntimeException("Couldn't retrieve 
standalone cluster", e);
--- End diff --

How about adding a `Flip6ClusterException` to wrap this exception?


---


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-02 Thread Vijay Srinivasaraghavan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189049#comment-16189049
 ] 

Vijay Srinivasaraghavan commented on FLINK-7737:


Some observation with respect to the usage of hflush vs hsync. When using HCFS 
implementation as backed filesystem, only hflush() is invoked since call to 
hsync() happens only when the FSDataOutputStream is instance of 
HdfsDataOutputStream. Due to this fact, we are seeing some data loss when the 
bucketing sink is holding data in pending state and trying to close the stream 
(as part of TM failover recovery).

I do not see any issue in adding another condition to include hsync() call for 
HCFS types (FSDataOutputStream). 

[~rmetzger] Could you please take a look?

hflush() - This API flushes all outstanding data (i.e. the current unfinished 
packet) from the client into the OS buffers on all DataNode replicas.

hsync() - This API flushes the data to the DataNodes, like hflush(), but should 
also force the data to underlying physical storage via fsync (or equivalent).

https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188860#comment-16188860
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142258814
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void 

[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188861#comment-16188861
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142258856
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142258856
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142258814
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
+   }
+
+   @Override
+   protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader 
classLoader) throws ProgramInvocationException {
+   log.info("Submitting job.");
+   try {
+   // temporary hack for FLIP-6
+   jobGraph.setAllowQueuedScheduling(true);
+   submitJob(jobGraph);
+   } catch (JobSubmissionException e) {
+   throw new RuntimeException(e);
+   }
+   // don't return just a JobSubmissionResult here, the signature 
is lying
+   // The CliFrontend expects this to be a JobExecutionResult
+
+   // TOOD: do not exit this method until job is finished
+   return new JobExecutionResult(jobGraph.getJobID(), 1, 
Collections.emptyMap());
+   }
+
+   private void submitJob(JobGraph jobGraph) throws JobSubmissionException 
{
+   log.info("Requesting blob server port.");
+   int blobServerPort;
+   try {
+   CompletableFuture 
portFuture = 

[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188850#comment-16188850
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142257641
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
--- End diff --

We don't need to call super. The ClusterClient implementation shuts down 
the HA services/actorSystemLoeader, which we don't use.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142257641
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobClient;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.client.JobSubmissionException;
+import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link ClusterClient} implementation that communicates via HTTP REST 
requests.
+ */
+public class RestClusterClient extends ClusterClient {
+
+   private RestClusterClientConfiguration configuration;
+   private final RestClient restEndpoint;
+
+   public RestClusterClient(Configuration config) throws Exception {
+   this(config, 
RestClusterClientConfiguration.fromConfiguration(config));
+   }
+
+   public RestClusterClient(Configuration config, 
RestClusterClientConfiguration configuration) throws Exception {
+   super(config);
+   this.configuration = configuration;
+   this.restEndpoint = new 
RestClient(configuration.getRestEndpointConfiguration(), 
Executors.newFixedThreadPool(4));
+   }
+
+   @Override
+   public void shutdown() {
+   this.restEndpoint.shutdown(Time.seconds(5));
--- End diff --

We don't need to call super. The ClusterClient implementation shuts down 
the HA services/actorSystemLoeader, which we don't use.


---


[jira] [Commented] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188843#comment-16188843
 ] 

ASF GitHub Bot commented on FLINK-7371:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142254410
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
 ---
@@ -85,6 +85,46 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
   }
 
   @Test
+  def testOverWindowWithConstant(): Unit = {
+
+val data = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (3L, 3, "Hello"),
+  (4L, 4, "Hello"),
+  (5L, 5, "Hello"),
+  (6L, 6, "Hello"),
+  (7L, 7, "Hello World"),
+  (8L, 8, "Hello World"),
+  (8L, 8, "Hello World"),
+  (20L, 20, "Hello World"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+val weightAvgFun = new WeightedAvg
+
+val windowedTable = table
+  .window(
+Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 
'w)
+  .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
+  .select('c, 'wAvg)
--- End diff --

can be removed


> user defined aggregator assumes nr of arguments smaller or equal than number 
> of row fields
> --
>
> Key: FLINK-7371
> URL: https://issues.apache.org/jira/browse/FLINK-7371
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Stefano Bortoli
>Assignee: Timo Walther
>
> The definition of user define aggregations with a number of parameters larger 
> than the row fields causes ArrayIndexOutOfBoundsException because the 
> indexing is based on a linear iteration over row fields. This does not 
> consider cases where fields can be used more than once and constant values 
> are passed to the aggregation function.
> for example:
> {code}
> window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW 
> aggs [myAgg($0, $1, $3, $0, $4)])
> {code}
> where $3 and $4 are reference to constants, and $0 and $1 are fields causes:
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 4
>   at 
> org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134)
>   at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>   at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147)
>   at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188842#comment-16188842
 ] 

ASF GitHub Bot commented on FLINK-7371:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142256742
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) {
}
 
// overloaded accumulate method
+   // dummy to test constants
+   public void accumulate(WeightedAvgAccum accumulator, long 
iValue, int iWeight, int x, String string) {
+   accumulator.sum += iWeight + Integer.parseInt(string);
--- End diff --

change the method to 
```
accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight;
accumulator.count += iWeight;
``` 

to have some influence of the value of `string` in the result?


> user defined aggregator assumes nr of arguments smaller or equal than number 
> of row fields
> --
>
> Key: FLINK-7371
> URL: https://issues.apache.org/jira/browse/FLINK-7371
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Stefano Bortoli
>Assignee: Timo Walther
>
> The definition of user define aggregations with a number of parameters larger 
> than the row fields causes ArrayIndexOutOfBoundsException because the 
> indexing is based on a linear iteration over row fields. This does not 
> consider cases where fields can be used more than once and constant values 
> are passed to the aggregation function.
> for example:
> {code}
> window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW 
> aggs [myAgg($0, $1, $3, $0, $4)])
> {code}
> where $3 and $4 are reference to constants, and $0 and $1 are fields causes:
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 4
>   at 
> org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134)
>   at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>   at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147)
>   at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188844#comment-16188844
 ] 

ASF GitHub Bot commented on FLINK-7371:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142247401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1670,4 +1670,34 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable constant to the member area of the generated 
[[Function]].
+*
+* @param constant constant expression
+* @return member variable term
+*/
+  def addReusableBoxedConstant(constant: GeneratedExpression): String = {
+require(constant.literal, "Literal expected")
+
+val fieldTerm = newName("constant")
+
+val boxed = generateOutputFieldBoxing(constant)
+val boxedType = boxedTypeTermForTypeInfo(boxed.resultType)
+
+val field =
+  s"""
+|transient $boxedType $fieldTerm;
--- End diff --

why `transient`? Couldn't this be `final`?


> user defined aggregator assumes nr of arguments smaller or equal than number 
> of row fields
> --
>
> Key: FLINK-7371
> URL: https://issues.apache.org/jira/browse/FLINK-7371
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.1
>Reporter: Stefano Bortoli
>Assignee: Timo Walther
>
> The definition of user define aggregations with a number of parameters larger 
> than the row fields causes ArrayIndexOutOfBoundsException because the 
> indexing is based on a linear iteration over row fields. This does not 
> consider cases where fields can be used more than once and constant values 
> are passed to the aggregation function.
> for example:
> {code}
> window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW 
> aggs [myAgg($0, $1, $3, $0, $4)])
> {code}
> where $3 and $4 are reference to constants, and $0 and $1 are fields causes:
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 4
>   at 
> org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134)
>   at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>   at 
> org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at 
> org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147)
>   at 
> org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142254410
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala
 ---
@@ -85,6 +85,46 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
   }
 
   @Test
+  def testOverWindowWithConstant(): Unit = {
+
+val data = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (3L, 3, "Hello"),
+  (4L, 4, "Hello"),
+  (5L, 5, "Hello"),
+  (6L, 6, "Hello"),
+  (7L, 7, "Hello World"),
+  (8L, 8, "Hello World"),
+  (8L, 8, "Hello World"),
+  (20L, 20, "Hello World"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setParallelism(1)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+StreamITCase.clear
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+val weightAvgFun = new WeightedAvg
+
+val windowedTable = table
+  .window(
+Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 
'w)
+  .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
+  .select('c, 'wAvg)
--- End diff --

can be removed


---


[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142247401
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 ---
@@ -1670,4 +1670,34 @@ abstract class CodeGenerator(
 
 fieldTerm
   }
+
+  /**
+* Adds a reusable constant to the member area of the generated 
[[Function]].
+*
+* @param constant constant expression
+* @return member variable term
+*/
+  def addReusableBoxedConstant(constant: GeneratedExpression): String = {
+require(constant.literal, "Literal expected")
+
+val fieldTerm = newName("constant")
+
+val boxed = generateOutputFieldBoxing(constant)
+val boxedType = boxedTypeTermForTypeInfo(boxed.resultType)
+
+val field =
+  s"""
+|transient $boxedType $fieldTerm;
--- End diff --

why `transient`? Couldn't this be `final`?


---


[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4736#discussion_r142256742
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
@@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) {
}
 
// overloaded accumulate method
+   // dummy to test constants
+   public void accumulate(WeightedAvgAccum accumulator, long 
iValue, int iWeight, int x, String string) {
+   accumulator.sum += iWeight + Integer.parseInt(string);
--- End diff --

change the method to 
```
accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight;
accumulator.count += iWeight;
``` 

to have some influence of the value of `string` in the result?


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142255730
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
--- End diff --

it's always null.


---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188830#comment-16188830
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142255730
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment;
+
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+/**
+ * A deployment descriptor for an existing cluster.
+ */
+public class Flip6StandaloneClusterDescriptor implements 
ClusterDescriptor {
+
+   private final Configuration config;
+
+   public Flip6StandaloneClusterDescriptor(Configuration config) {
+   this.config = config;
+   }
+
+   @Override
+   public String getClusterDescription() {
+   String host = config.getString(JobManagerOptions.ADDRESS, "");
+   int port = config.getInteger(JobManagerOptions.PORT, -1);
+   return "FLIP-6 Standalone cluster at " + host + ":" + port;
+   }
+
+   @Override
+   public RestClusterClient retrieve(String applicationID) {
--- End diff --

it's always null.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188804#comment-16188804
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142251500
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142251500
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+   protected 

[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188767#comment-16188767
 ] 

ASF GitHub Bot commented on FLINK-7695:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4737


> Port JobConfigHandler to new REST endpoint
> --
>
> Key: FLINK-7695
> URL: https://issues.apache.org/jira/browse/FLINK-7695
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188768#comment-16188768
 ] 

ASF GitHub Bot commented on FLINK-7668:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4728


> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder

2017-10-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7668.

   Resolution: Fixed
Fix Version/s: 1.4.0

Added via aae417f113d0f9db3ac3b4cbadd378134f30b440

> Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
> -
>
> Key: FLINK-7668
> URL: https://issues.apache.org/jira/browse/FLINK-7668
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Once we support offline {{AccessExecutionGraph}} implementation (see 
> FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} 
> after which the {{AccessExecutionGraph}} is retrieved again from the 
> {{JobMaster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4728: [FLINK-7668] Add ExecutionGraphCache for Execution...

2017-10-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4728


---


[jira] [Commented] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188766#comment-16188766
 ] 

ASF GitHub Bot commented on FLINK-7710:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4750


> Port CheckpointStatsHandler to new REST endpoint
> 
>
> Key: FLINK-7710
> URL: https://issues.apache.org/jira/browse/FLINK-7710
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7708) Port CheckpointConfigHandler to new REST endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188769#comment-16188769
 ] 

ASF GitHub Bot commented on FLINK-7708:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4744


> Port CheckpointConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7708
> URL: https://issues.apache.org/jira/browse/FLINK-7708
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointConfigHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4744: [FLINK-7708] [flip6] Add CheckpointConfigHandler f...

2017-10-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4744


---


[GitHub] flink pull request #4750: [FLINK-7710] [flip6] Add CheckpointStatisticsHandl...

2017-10-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4750


---


[GitHub] flink pull request #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new ...

2017-10-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4737


---


[jira] [Closed] (FLINK-7695) Port JobConfigHandler to new REST endpoint

2017-10-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7695.

   Resolution: Fixed
Fix Version/s: 1.4.0

Added via 172a64c1488bd6edda97473562c6871ae7f3364d

> Port JobConfigHandler to new REST endpoint
> --
>
> Key: FLINK-7695
> URL: https://issues.apache.org/jira/browse/FLINK-7695
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint

2017-10-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7710.
--
Resolution: Fixed

Added via ac82becd21b7766c18d16abfc7e08334c644507e

> Port CheckpointStatsHandler to new REST endpoint
> 
>
> Key: FLINK-7710
> URL: https://issues.apache.org/jira/browse/FLINK-7710
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (FLINK-7708) Port CheckpointConfigHandler to new REST endpoint

2017-10-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7708.
--
Resolution: Fixed

Added via b41f5a66cd6d62bf3c271f1d0bf9d8fa50a5d410

> Port CheckpointConfigHandler to new REST endpoint
> -
>
> Key: FLINK-7708
> URL: https://issues.apache.org/jira/browse/FLINK-7708
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointConfigHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188727#comment-16188727
 ] 

ASF GitHub Bot commented on FLINK-7678:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4726#discussion_r142238899
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 ---
@@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   "Nullable(f0)",
   "Nullable(f0)",
   "42")
+
+// test row type input
+testAllApis(
+  Func19('f11),
+  "Func19(f11)",
+  "Func19(f11)",
+  "12,true,1,2,3"
--- End diff --

is the result of a scalar function automatically flattened?


> SQL UserDefineTableFunction does not take CompositeType input correctly
> ---
>
> Key: FLINK-7678
> URL: https://issues.apache.org/jira/browse/FLINK-7678
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Rong Rong
>Assignee: Timo Walther
>
> UDF is using FlinkTypeFactory to infer operand type while UDTF does not go 
> through the same code path. This result in:
> {code:console}
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 1, column 38 to line 1, column 44: No match found for function signature 
> func() 
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 38 to line 1, column 44: No match found for function signature 
> func()
> {code}
> Please see github code for more info:
> https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188729#comment-16188729
 ] 

ASF GitHub Bot commented on FLINK-7678:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4726#discussion_r142238516
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
 ---
@@ -210,6 +213,31 @@ class CorrelateITCase extends 
StreamingMultipleProgramsTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testRowType(): Unit = {
+val row = Row.of(
+  12.asInstanceOf[Integer],
+  true.asInstanceOf[JBoolean],
+  Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 
3.asInstanceOf[Integer])
+)
+
+val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, 
Types.INT, Types.INT))
+val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 
'c)
+
+val tableFunc4 = new TableFunc4()
+val result = in
+  .join(tableFunc4('c) as ('f0, 'f1, 'f2))
+  .select('c, 'f2)
+
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,2,3,3",
--- End diff --

is this the correct result? Shouldn't `'c` remain nested? We did not ask to 
flatten it.


> SQL UserDefineTableFunction does not take CompositeType input correctly
> ---
>
> Key: FLINK-7678
> URL: https://issues.apache.org/jira/browse/FLINK-7678
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Rong Rong
>Assignee: Timo Walther
>
> UDF is using FlinkTypeFactory to infer operand type while UDTF does not go 
> through the same code path. This result in:
> {code:console}
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 1, column 38 to line 1, column 44: No match found for function signature 
> func() 
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 38 to line 1, column 44: No match found for function signature 
> func()
> {code}
> Please see github code for more info:
> https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188728#comment-16188728
 ] 

ASF GitHub Bot commented on FLINK-7678:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4726#discussion_r142237447
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
 ---
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
--- End diff --

Undo?


> SQL UserDefineTableFunction does not take CompositeType input correctly
> ---
>
> Key: FLINK-7678
> URL: https://issues.apache.org/jira/browse/FLINK-7678
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Rong Rong
>Assignee: Timo Walther
>
> UDF is using FlinkTypeFactory to infer operand type while UDTF does not go 
> through the same code path. This result in:
> {code:console}
> org.apache.flink.table.api.ValidationException: SQL validation failed. From 
> line 1, column 38 to line 1, column 44: No match found for function signature 
> func() 
> Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, 
> column 38 to line 1, column 44: No match found for function signature 
> func()
> {code}
> Please see github code for more info:
> https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4726#discussion_r142237447
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
 ---
@@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp}
 
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.{TableEnvironment, Types}
--- End diff --

Undo?


---


[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4726#discussion_r142238899
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 ---
@@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   "Nullable(f0)",
   "Nullable(f0)",
   "42")
+
+// test row type input
+testAllApis(
+  Func19('f11),
+  "Func19(f11)",
+  "Func19(f11)",
+  "12,true,1,2,3"
--- End diff --

is the result of a scalar function automatically flattened?


---


[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4726#discussion_r142238516
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
 ---
@@ -210,6 +213,31 @@ class CorrelateITCase extends 
StreamingMultipleProgramsTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testRowType(): Unit = {
+val row = Row.of(
+  12.asInstanceOf[Integer],
+  true.asInstanceOf[JBoolean],
+  Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 
3.asInstanceOf[Integer])
+)
+
+val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, 
Types.INT, Types.INT))
+val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 
'c)
+
+val tableFunc4 = new TableFunc4()
+val result = in
+  .join(tableFunc4('c) as ('f0, 'f1, 'f2))
+  .select('c, 'f2)
+
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = mutable.MutableList(
+  "1,2,3,3",
--- End diff --

is this the correct result? Shouldn't `'c` remain nested? We did not ask to 
flatten it.


---


[jira] [Created] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins

2017-10-02 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-7755:


 Summary: Null values are not correctly handled by batch inner and 
outer joins
 Key: FLINK-7755
 URL: https://issues.apache.org/jira/browse/FLINK-7755
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Affects Versions: 1.3.2, 1.4.0
Reporter: Fabian Hueske
Priority: Blocker
 Fix For: 1.4.0, 1.3.3


Join predicates of batch joins are not correctly evaluated according to 
three-value logic.
This affects inner as well as outer joins.

The problem is that some equality predicates are only evaluated by the internal 
join algorithms of Flink which are based on {{TypeComparator}}. The field 
{{TypeComparator}} for {{Row}} are implemented such that {{null == null}} 
results in {{TRUE}} to ensure correct ordering and grouping. However, 
three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or 
null). The code generator implements this logic correctly, but for equality 
predicates, no code is generated.

For outer joins, the problem is a bit tricker because these do not support 
code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a solution 
for this issue.

We also need to extend several of the existing tests and add null values to 
ensure that the join logic is correctly implemented. 




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7754) Complete termination future after actor has been stopped.

2017-10-02 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7754:


 Summary: Complete termination future after actor has been stopped.
 Key: FLINK-7754
 URL: https://issues.apache.org/jira/browse/FLINK-7754
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


At the moment, we complete the termination future when the {{postStop}} method 
of the {{RpcActor}} has been executed. This, however, does not mean that the 
underlying actor has been stopped. We should rather complete the future in the 
{{AkkaRpcService#stopServer}} method where we close the actor with a graceful 
shutdown.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7753) HandlerUtils should close the channel on error responses

2017-10-02 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7753:
---

 Summary: HandlerUtils should close the channel on error responses
 Key: FLINK-7753
 URL: https://issues.apache.org/jira/browse/FLINK-7753
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Assignee: Eron Wright 
Priority: Minor


Unexpected errors in the server pipeline correctly cause a 500 error response.  
 I suggest that such responses also close the channel rather than allowing 
keep-alive.   This would be a better security posture too since we don't know 
if the pipeline is corrupt following an unexpected error.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7752) RedirectHandler should execute on the IO thread

2017-10-02 Thread Eron Wright (JIRA)
Eron Wright  created FLINK-7752:
---

 Summary: RedirectHandler should execute on the IO thread
 Key: FLINK-7752
 URL: https://issues.apache.org/jira/browse/FLINK-7752
 Project: Flink
  Issue Type: Sub-task
Reporter: Eron Wright 
Priority: Minor


The redirect handler executes much of its logic (including 'respondAsLeader') 
on an arbitrary thread, but it would be cleaner to execute on the I/O thread by 
using `channelHandlerContext.executor()`.   Note that Netty allows writes on 
any thread but reads on the I/O thread.   



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188511#comment-16188511
 ] 

ASF GitHub Bot commented on FLINK-7709:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4763

[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

## What is the purpose of the change

Adds the new `CheckpointStatisticDetailsHandler` for the new REST server 
endpoint.

Moreover, this PR disables the `FAIL_ON_MISSING_CREATOR_PROPERTIES` 
property for the `RestMapperUtils.getStrictObjectMapper` because that is 
something the individuals beans can do on their own (e.g. by checking with 
`Preconditions.checkNotNull`).

R @zentol because of the changes to the `ObjectMapper` setup.

## Verifying this change

This change is already covered by existing tests, such as 
`CheckpointingStatisticsTest`.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
portCheckpointStatsDetailsHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4763


commit c09e579b26752f2ae477d60e8fbb6ccdce315df9
Author: Till Rohrmann 
Date:   2017-09-25T13:29:59Z

[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the 
latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. 
Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache 
entries after
a given time to live period. This will trigger requesting the 
AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph 
based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic 
cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache 
entries which
have exceeded their time to live. Currently it is set to 20 * 
refreshInterval of the
web gui.

This closes #4728.

commit a4f9ef81c02738b40cf0ba375650684b46f5417d
Author: Till Rohrmann 
Date:   2017-09-26T16:39:15Z

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint

This closes #4737.

commit 4259fcc96c7c72644806941bd0df9f508a2f0bcd
Author: Till Rohrmann 
Date:   2017-09-28T16:35:50Z

[FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint

This commit implements the CheckpointConfigHandler which now returns a
CheckpointConfigInfo object if checkpointing is enabled. In case that
checkpointing is disabled for a job, it will return a 404 response.

commit 6f1756b541ad76521c7b653f834c1032c623f1e6
Author: Till Rohrmann 
Date:   2017-09-29T13:09:06Z

[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST 
endpoint

This commit also makes the CheckpointStatsHistory object serializable by 
removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.

commit bd8109b0af1a90fc32f76e57261a252762d678eb
Author: Till Rohrmann 
Date:   2017-10-02T17:39:38Z

[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

commit 4d694411e9e4b6c508c258655c9c69cb26ddb6be
Author: Till Rohrmann 
Date:   2017-10-02T17:52:13Z

Disable failing when not all creator properties are known




> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7709
> URL: https://issues.apache.org/jira/browse/FLINK-7709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> 

[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...

2017-10-02 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4763

[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

## What is the purpose of the change

Adds the new `CheckpointStatisticDetailsHandler` for the new REST server 
endpoint.

Moreover, this PR disables the `FAIL_ON_MISSING_CREATOR_PROPERTIES` 
property for the `RestMapperUtils.getStrictObjectMapper` because that is 
something the individuals beans can do on their own (e.g. by checking with 
`Preconditions.checkNotNull`).

R @zentol because of the changes to the `ObjectMapper` setup.

## Verifying this change

This change is already covered by existing tests, such as 
`CheckpointingStatisticsTest`.


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
portCheckpointStatsDetailsHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4763.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4763


commit c09e579b26752f2ae477d60e8fbb6ccdce315df9
Author: Till Rohrmann 
Date:   2017-09-25T13:29:59Z

[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the 
latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. 
Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache 
entries after
a given time to live period. This will trigger requesting the 
AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph 
based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic 
cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache 
entries which
have exceeded their time to live. Currently it is set to 20 * 
refreshInterval of the
web gui.

This closes #4728.

commit a4f9ef81c02738b40cf0ba375650684b46f5417d
Author: Till Rohrmann 
Date:   2017-09-26T16:39:15Z

[FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint

This closes #4737.

commit 4259fcc96c7c72644806941bd0df9f508a2f0bcd
Author: Till Rohrmann 
Date:   2017-09-28T16:35:50Z

[FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint

This commit implements the CheckpointConfigHandler which now returns a
CheckpointConfigInfo object if checkpointing is enabled. In case that
checkpointing is disabled for a job, it will return a 404 response.

commit 6f1756b541ad76521c7b653f834c1032c623f1e6
Author: Till Rohrmann 
Date:   2017-09-29T13:09:06Z

[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST 
endpoint

This commit also makes the CheckpointStatsHistory object serializable by 
removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.

commit bd8109b0af1a90fc32f76e57261a252762d678eb
Author: Till Rohrmann 
Date:   2017-10-02T17:39:38Z

[FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint

commit 4d694411e9e4b6c508c258655c9c69cb26ddb6be
Author: Till Rohrmann 
Date:   2017-10-02T17:52:13Z

Disable failing when not all creator properties are known




---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188491#comment-16188491
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142207196
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142207196
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+   protected 

[GitHub] flink pull request #4732: [FLINK-7426] [table] Support null values in keys

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4732#discussion_r142165496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Null-aware key selector.
+  */
+class CRowKeySelector(
+val keyFields: Array[Int],
+@transient var returnType: TypeInformation[Row])
+  extends KeySelector[CRow, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def getKey(value: CRow): Row = {
+val row = value.row
+val fields = keyFields
+
+val newKey = new Row(fields.length)
+var i = 0
+while (i < fields.length) {
+  newKey.setField(i, row.getField(fields(i)))
--- End diff --

should we add a `project(int[])` method to `Row` similar to `Row.copy()` to 
reduce the number of method calls?


---


[jira] [Commented] (FLINK-7426) Table API does not support null values in keys

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188421#comment-16188421
 ] 

ASF GitHub Bot commented on FLINK-7426:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4732#discussion_r142167389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Null-aware key selector.
+  */
+class CRowKeySelector(
+val keyFields: Array[Int],
+@transient var returnType: TypeInformation[Row])
+  extends KeySelector[CRow, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def getKey(value: CRow): Row = {
+val row = value.row
+val fields = keyFields
+
+val newKey = new Row(fields.length)
+var i = 0
+while (i < fields.length) {
+  newKey.setField(i, row.getField(fields(i)))
--- End diff --

if we do that, we can also make `Row.copy()` more efficient by adding a 
private constructor that receives an `Object[]` and implementing `copy()` as 
`return new Row(Arrays.copyOf(row.fields, row.fields.length));`


> Table API does not support null values in keys
> --
>
> Key: FLINK-7426
> URL: https://issues.apache.org/jira/browse/FLINK-7426
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The Table API uses {{keyBy}} internally, however, the generated 
> {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not 
> able to serialize null values. This causes issues during checkpointing or 
> when using the RocksDB state backend. We need to replace all {{keyBy}} calls 
> with a custom {{RowKeySelector}}.
> {code}
> class AggregateITCase extends StreamingWithStateTestBase {
>   private val queryConfig = new StreamQueryConfig()
>   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
>   @Test
>   def testDistinct(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(getStateBackend)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.clear
> val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
>   .select('b, Null(Types.LONG)).distinct()
> val results = t.toRetractStream[Row](queryConfig)
> results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
> env.execute()
> val expected = mutable.MutableList("1,null", "2,null", "3,null", 
> "4,null", "5,null", "6,null")
> assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7426) Table API does not support null values in keys

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188419#comment-16188419
 ] 

ASF GitHub Bot commented on FLINK-7426:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4732#discussion_r142164921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Null-aware key selector.
+  */
+class CRowKeySelector(
+val keyFields: Array[Int],
+@transient var returnType: TypeInformation[Row])
+  extends KeySelector[CRow, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def getKey(value: CRow): Row = {
+val row = value.row
+val fields = keyFields
--- End diff --

Why not use `keyFields` directly?


> Table API does not support null values in keys
> --
>
> Key: FLINK-7426
> URL: https://issues.apache.org/jira/browse/FLINK-7426
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The Table API uses {{keyBy}} internally, however, the generated 
> {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not 
> able to serialize null values. This causes issues during checkpointing or 
> when using the RocksDB state backend. We need to replace all {{keyBy}} calls 
> with a custom {{RowKeySelector}}.
> {code}
> class AggregateITCase extends StreamingWithStateTestBase {
>   private val queryConfig = new StreamQueryConfig()
>   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
>   @Test
>   def testDistinct(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(getStateBackend)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.clear
> val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
>   .select('b, Null(Types.LONG)).distinct()
> val results = t.toRetractStream[Row](queryConfig)
> results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
> env.execute()
> val expected = mutable.MutableList("1,null", "2,null", "3,null", 
> "4,null", "5,null", "6,null")
> assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7426) Table API does not support null values in keys

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188420#comment-16188420
 ] 

ASF GitHub Bot commented on FLINK-7426:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4732#discussion_r142165496
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Null-aware key selector.
+  */
+class CRowKeySelector(
+val keyFields: Array[Int],
+@transient var returnType: TypeInformation[Row])
+  extends KeySelector[CRow, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def getKey(value: CRow): Row = {
+val row = value.row
+val fields = keyFields
+
+val newKey = new Row(fields.length)
+var i = 0
+while (i < fields.length) {
+  newKey.setField(i, row.getField(fields(i)))
--- End diff --

should we add a `project(int[])` method to `Row` similar to `Row.copy()` to 
reduce the number of method calls?


> Table API does not support null values in keys
> --
>
> Key: FLINK-7426
> URL: https://issues.apache.org/jira/browse/FLINK-7426
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.3.2
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> The Table API uses {{keyBy}} internally, however, the generated 
> {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not 
> able to serialize null values. This causes issues during checkpointing or 
> when using the RocksDB state backend. We need to replace all {{keyBy}} calls 
> with a custom {{RowKeySelector}}.
> {code}
> class AggregateITCase extends StreamingWithStateTestBase {
>   private val queryConfig = new StreamQueryConfig()
>   queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
>   @Test
>   def testDistinct(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStateBackend(getStateBackend)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> StreamITCase.clear
> val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
>   .select('b, Null(Types.LONG)).distinct()
> val results = t.toRetractStream[Row](queryConfig)
> results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
> env.execute()
> val expected = mutable.MutableList("1,null", "2,null", "3,null", 
> "4,null", "5,null", "6,null")
> assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4732: [FLINK-7426] [table] Support null values in keys

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4732#discussion_r142164921
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Null-aware key selector.
+  */
+class CRowKeySelector(
+val keyFields: Array[Int],
+@transient var returnType: TypeInformation[Row])
+  extends KeySelector[CRow, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def getKey(value: CRow): Row = {
+val row = value.row
+val fields = keyFields
--- End diff --

Why not use `keyFields` directly?


---


[GitHub] flink pull request #4732: [FLINK-7426] [table] Support null values in keys

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4732#discussion_r142167389
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
+import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.types.Row
+
+/**
+  * Null-aware key selector.
+  */
+class CRowKeySelector(
+val keyFields: Array[Int],
+@transient var returnType: TypeInformation[Row])
+  extends KeySelector[CRow, Row]
+  with ResultTypeQueryable[Row] {
+
+  override def getKey(value: CRow): Row = {
+val row = value.row
+val fields = keyFields
+
+val newKey = new Row(fields.length)
+var i = 0
+while (i < fields.length) {
+  newKey.setField(i, row.getField(fields(i)))
--- End diff --

if we do that, we can also make `Row.copy()` more efficient by adding a 
private constructor that receives an `Object[]` and implementing `copy()` as 
`return new Row(Arrays.copyOf(row.fields, row.fields.length));`


---


[jira] [Commented] (FLINK-7749) remove the ResultPartitionWriter

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188308#comment-16188308
 ] 

ASF GitHub Bot commented on FLINK-7749:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4762

[FLINK-7749][network] remove the ResultPartitionWriter wrapper

## What is the purpose of the change

After changing task event notification to the `TaskEventDispatcher` (#4759) 
and buffer writing to `ResultPartition` (#4761) this PR finally removes the 
`ResultPartitionWriter` wrapper.

## Brief change log

- remove `ResultPartitionWriter` and adapt all classes to use 
`ResultPartition` instead

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4762.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4762


commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed
Author: Nico Kruber 
Date:   2017-08-29T15:32:52Z

[FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels 
implementation up into ResultPartition

commit cfea094aa214612eaafef14554bc68626a6ff948
Author: Nico Kruber 
Date:   2017-08-29T16:24:00Z

[FLINK-7748][network] properly use the TaskEventDispatcher for subscribing 
to events

Previously, the ResultPartitionWriter implemented the EventListener 
interface
and was used for event registration, although event publishing was already
handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
both, event registration and publishing.

It also adds the TaskEventDispatcher to the Environment information for a 
task
to be able to work with it (only IterationHeadTask so far).

commit 88a0d0efc3e4daabbb6ac50ca0d5fa0481a333b6
Author: Nico Kruber 
Date:   2017-08-29T16:53:48Z

[FLINK-7749][network] remove the ResultPartitionWriter wrapper

Previous tasks, i.e. task event notification and buffer writing, are now 
handled
completely by the TaskEventDispatcher and the ResultPartition, respectively.




> remove the ResultPartitionWriter
> 
>
> Key: FLINK-7749
> URL: https://issues.apache.org/jira/browse/FLINK-7749
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> After changing task event notification to the {{TaskEventDispatcher}} 
> (FLINK-7746) and buffer writing to {{ResultPartition}} (FLINK-7748), we can 
> finally remove the {{ResultPartitionWriter}} wrapper.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4762: [FLINK-7749][network] remove the ResultPartitionWr...

2017-10-02 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4762

[FLINK-7749][network] remove the ResultPartitionWriter wrapper

## What is the purpose of the change

After changing task event notification to the `TaskEventDispatcher` (#4759) 
and buffer writing to `ResultPartition` (#4761) this PR finally removes the 
`ResultPartitionWriter` wrapper.

## Brief change log

- remove `ResultPartitionWriter` and adapt all classes to use 
`ResultPartition` instead

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7749

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4762.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4762


commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed
Author: Nico Kruber 
Date:   2017-08-29T15:32:52Z

[FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels 
implementation up into ResultPartition

commit cfea094aa214612eaafef14554bc68626a6ff948
Author: Nico Kruber 
Date:   2017-08-29T16:24:00Z

[FLINK-7748][network] properly use the TaskEventDispatcher for subscribing 
to events

Previously, the ResultPartitionWriter implemented the EventListener 
interface
and was used for event registration, although event publishing was already
handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
both, event registration and publishing.

It also adds the TaskEventDispatcher to the Environment information for a 
task
to be able to work with it (only IterationHeadTask so far).

commit 88a0d0efc3e4daabbb6ac50ca0d5fa0481a333b6
Author: Nico Kruber 
Date:   2017-08-29T16:53:48Z

[FLINK-7749][network] remove the ResultPartitionWriter wrapper

Previous tasks, i.e. task event notification and buffer writing, are now 
handled
completely by the TaskEventDispatcher and the ResultPartition, respectively.




---


[jira] [Commented] (FLINK-7748) remove event listener behaviour from ResultPartitionWriter

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188297#comment-16188297
 ] 

ASF GitHub Bot commented on FLINK-7748:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4761

[FLINK-7748][network] properly use the TaskEventDispatcher for subscribing 
to events

## What is the purpose of the change

`ResultPartitionWriter` currently implements the `EventListener` interface 
and is used for event registration, although event publishing is already 
handled via the `TaskEventDispatcher`.
Instead of using two different places, this should be unified by using 
`TaskEventDispatcher` only which is done by this PR.

Please note that this PR builds upon #4759.

## Brief change log

- make `TaskEventDispatcher` more generic to register result partitions via 
`ResultPartitionID` with one `TaskEventHandler` per partition (handled by 
`TaskEventDispatcher`, not inside `ResultPartitionWriter`)
- remove the `EventListener` implementation from 
`ResultPartitionWriter`
- add the `TaskEventDispatcher` to the `Environment` information for a task 
to be able to work with it (only `IterationHeadTask` is using this as of now)
- adapt all places to use `TaskEventDispatcher` instead of 
`ResultPartitionWriter`

## Verifying this change

This change added tests and can be verified as follows:

- a new `TaskEventDispatcherTest` verifies that `TaskEventDispatcher` works 
as expected
- indirectly, tests under `org.apache.flink.runtime.iterative` may verify 
the use of the `IterationHeadTask` which is the only user so far

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4761.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4761


commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed
Author: Nico Kruber 
Date:   2017-08-29T15:32:52Z

[FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels 
implementation up into ResultPartition

commit cfea094aa214612eaafef14554bc68626a6ff948
Author: Nico Kruber 
Date:   2017-08-29T16:24:00Z

[FLINK-7748][network] properly use the TaskEventDispatcher for subscribing 
to events

Previously, the ResultPartitionWriter implemented the EventListener 
interface
and was used for event registration, although event publishing was already
handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
both, event registration and publishing.

It also adds the TaskEventDispatcher to the Environment information for a 
task
to be able to work with it (only IterationHeadTask so far).




> remove event listener behaviour from ResultPartitionWriter
> --
>
> Key: FLINK-7748
> URL: https://issues.apache.org/jira/browse/FLINK-7748
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{ResultPartitionWriter}} currently implements the {{EventListener}} 
> interface and is used for event registration, although event publishing is 
> already handled via the {{TaskEventDispatcher}}. This should be unified by 
> using {{TaskEventDispatcher}} only.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...

2017-10-02 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4761

[FLINK-7748][network] properly use the TaskEventDispatcher for subscribing 
to events

## What is the purpose of the change

`ResultPartitionWriter` currently implements the `EventListener` interface 
and is used for event registration, although event publishing is already 
handled via the `TaskEventDispatcher`.
Instead of using two different places, this should be unified by using 
`TaskEventDispatcher` only which is done by this PR.

Please note that this PR builds upon #4759.

## Brief change log

- make `TaskEventDispatcher` more generic to register result partitions via 
`ResultPartitionID` with one `TaskEventHandler` per partition (handled by 
`TaskEventDispatcher`, not inside `ResultPartitionWriter`)
- remove the `EventListener` implementation from 
`ResultPartitionWriter`
- add the `TaskEventDispatcher` to the `Environment` information for a task 
to be able to work with it (only `IterationHeadTask` is using this as of now)
- adapt all places to use `TaskEventDispatcher` instead of 
`ResultPartitionWriter`

## Verifying this change

This change added tests and can be verified as follows:

- a new `TaskEventDispatcherTest` verifies that `TaskEventDispatcher` works 
as expected
- indirectly, tests under `org.apache.flink.runtime.iterative` may verify 
the use of the `IterationHeadTask` which is the only user so far

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4761.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4761


commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed
Author: Nico Kruber 
Date:   2017-08-29T15:32:52Z

[FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels 
implementation up into ResultPartition

commit cfea094aa214612eaafef14554bc68626a6ff948
Author: Nico Kruber 
Date:   2017-08-29T16:24:00Z

[FLINK-7748][network] properly use the TaskEventDispatcher for subscribing 
to events

Previously, the ResultPartitionWriter implemented the EventListener 
interface
and was used for event registration, although event publishing was already
handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for
both, event registration and publishing.

It also adds the TaskEventDispatcher to the Environment information for a 
task
to be able to work with it (only IterationHeadTask so far).




---


[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14

2017-10-02 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188254#comment-16188254
 ] 

Fabian Hueske commented on FLINK-7051:
--

The vote for Calcite 1.14 has passed and it will be released very soon.

> Bump up Calcite version to 1.14
> ---
>
> Key: FLINK-7051
> URL: https://issues.apache.org/jira/browse/FLINK-7051
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Haohui Mai
>
> This is an umbrella issue for all tasks that need to be done once Apache 
> Calcite 1.14 is released.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle

2017-10-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7728.
---
   Resolution: Fixed
Fix Version/s: (was: 1.2.2)

Fixed on release-1.3 in
4dcd9fba21eef157063d2d70b0e55869009f2954
2875260d067c1e6754a248b8e281516ad9b5a269
c528139c3c2b33ba7bd11df154ba204408713b02

> StatusWatermarkValve has different min watermark advancement behavior 
> depending on the ordering inputs become idle
> --
>
> Key: FLINK-7728
> URL: https://issues.apache.org/jira/browse/FLINK-7728
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, once all inputs of a {{StatusWatermarkValve}} becomes idle, we 
> only emit the {{StreamStatus.IDLE}} marker, and check nothing else. This 
> makes the watermark advancement behavior inconsistent in the case that all 
> inputs become idle, depending on the order that they become idle.
> Consider the following setup:
> {code}
> Channel #1: Watermark 10, ACTIVE
> Channel #2: Watermark 5, ACTIVE
> Channel #3: Watermark 5, ACTIVE
> {code}
> If the channels become IDLE with the order #2 -> #3 -> #1, watermark 10 will 
> be emitted as the new min watermark once both #2 and #3 becomes idle.
> On the other hand, if the order is #1 -> #2 -> #3, watermark 10 will not be 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4747: [FLINK-7728] [DataStream] Flush StatusWatermarkValve once...

2017-10-02 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4747
  
Merged. 👌 Could you please also close this PR?


---


[jira] [Commented] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188232#comment-16188232
 ] 

ASF GitHub Bot commented on FLINK-7728:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4747
  
Merged.  Could you please also close this PR?


> StatusWatermarkValve has different min watermark advancement behavior 
> depending on the ordering inputs become idle
> --
>
> Key: FLINK-7728
> URL: https://issues.apache.org/jira/browse/FLINK-7728
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, once all inputs of a {{StatusWatermarkValve}} becomes idle, we 
> only emit the {{StreamStatus.IDLE}} marker, and check nothing else. This 
> makes the watermark advancement behavior inconsistent in the case that all 
> inputs become idle, depending on the order that they become idle.
> Consider the following setup:
> {code}
> Channel #1: Watermark 10, ACTIVE
> Channel #2: Watermark 5, ACTIVE
> Channel #3: Watermark 5, ACTIVE
> {code}
> If the channels become IDLE with the order #2 -> #3 -> #1, watermark 10 will 
> be emitted as the new min watermark once both #2 and #3 becomes idle.
> On the other hand, if the order is #1 -> #2 -> #3, watermark 10 will not be 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188220#comment-16188220
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142159219
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,417 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142158422
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,417 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
--- End diff --

"or would result in an inefficient type" -> "or cases where automatic type 
inference results in an inefficient type."


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188221#comment-16188221
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142158422
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,417 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
--- End diff --

"or would result in an inefficient type" -> "or cases where automatic type 
inference results in an inefficient type."


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142159219
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,417 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where type 
information has to be
+ * supplied manually or would result in an inefficient type.
+ *
+ * Please note that the Scala API and Table API provide more 
specialized Types classes.
+ * (See org.apache.flink.api.scala.Types and 
org.apache.flink.table.api.Types)
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and 
{@link java.lang.Byte}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and {@link java.lang.Boolean}.
+* Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
{@link java.lang.Short}.
+* Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive int and 
{@link java.lang.Integer}.
+* Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and 
{@link java.lang.Long}.
+* Does not support a null value.
+*/
+   public static final TypeInformation LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive float and 
{@link java.lang.Float}.

[jira] [Commented] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle

2017-10-02 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188218#comment-16188218
 ] 

Aljoscha Krettek commented on FLINK-7728:
-

Fixed on master in 
6481564cb581164a761444922ddfb4ad11a8ef55
f9b79662ef575580a841af36747618a29a3955d9
ea86a537e6db1606241c0b8afe03c0754176c85e

> StatusWatermarkValve has different min watermark advancement behavior 
> depending on the ordering inputs become idle
> --
>
> Key: FLINK-7728
> URL: https://issues.apache.org/jira/browse/FLINK-7728
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.2.2, 1.4.0, 1.3.3
>
>
> Currently, once all inputs of a {{StatusWatermarkValve}} becomes idle, we 
> only emit the {{StreamStatus.IDLE}} marker, and check nothing else. This 
> makes the watermark advancement behavior inconsistent in the case that all 
> inputs become idle, depending on the order that they become idle.
> Consider the following setup:
> {code}
> Channel #1: Watermark 10, ACTIVE
> Channel #2: Watermark 5, ACTIVE
> Channel #3: Watermark 5, ACTIVE
> {code}
> If the channels become IDLE with the order #2 -> #3 -> #1, watermark 10 will 
> be emitted as the new min watermark once both #2 and #3 becomes idle.
> On the other hand, if the order is #1 -> #2 -> #3, watermark 10 will not be 
> emitted.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels

2017-10-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7721.
---
Resolution: Fixed

Fixed on master in c81a7e519bf3db2f9c5e62fadd7a3a4fb7692904

> StatusWatermarkValve should output a new min watermark only if it was 
> aggregated from aligned chhanels
> --
>
> Key: FLINK-7721
> URL: https://issues.apache.org/jira/browse/FLINK-7721
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Context:
> {code}
> long newMinWatermark = Long.MAX_VALUE;
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> {code}
> In the calculation of the new min watermark in 
> {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, 
> there is not verification that the calculated new min watermark 
> {{newMinWatermark}} really is aggregated from some aligned channel.
> In the corner case where all input channels are currently not aligned but 
> actually some are active, we would then incorrectly determine that the final 
> aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that.
> The fix would simply be to only emit the aggregated watermark IFF it was 
> really calculated from some aligned input channel (as well as the already 
> existing constraint that it needs to be larger than the last emitted 
> watermark). This change should also safely cover the case that a 
> {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels

2017-10-02 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reopened FLINK-7721:
-

Reopen to fix git commit hash

> StatusWatermarkValve should output a new min watermark only if it was 
> aggregated from aligned chhanels
> --
>
> Key: FLINK-7721
> URL: https://issues.apache.org/jira/browse/FLINK-7721
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.2.1, 1.4.0, 1.3.2
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> Context:
> {code}
> long newMinWatermark = Long.MAX_VALUE;
> for (InputChannelStatus channelStatus : channelStatuses) {
> if (channelStatus.isWatermarkAligned) {
> newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark);
> }
> }
> {code}
> In the calculation of the new min watermark in 
> {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, 
> there is not verification that the calculated new min watermark 
> {{newMinWatermark}} really is aggregated from some aligned channel.
> In the corner case where all input channels are currently not aligned but 
> actually some are active, we would then incorrectly determine that the final 
> aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that.
> The fix would simply be to only emit the aggregated watermark IFF it was 
> really calculated from some aligned input channel (as well as the already 
> existing constraint that it needs to be larger than the last emitted 
> watermark). This change should also safely cover the case that a 
> {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142157854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -20,60 +20,123 @@ package org.apache.flink.table.api
 import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
TypeInformation, Types => JTypes}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.types.Row
 
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
--- End diff --

Makes sense, thanks


---


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188202#comment-16188202
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142157854
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala
 ---
@@ -20,60 +20,123 @@ package org.apache.flink.table.api
 import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
TypeInformation, Types => JTypes}
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, 
ObjectArrayTypeInfo}
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
-import org.apache.flink.types.Row
 
 import _root_.scala.annotation.varargs
 
 /**
-  * This class enumerates all supported types of the Table API.
+  * This class enumerates all supported types of the Table API & SQL.
   */
 object Types {
--- End diff --

Makes sense, thanks


> Add helper methods for all built-in Flink types to Types
> 
>
> Key: FLINK-7452
> URL: https://issues.apache.org/jira/browse/FLINK-7452
> Project: Flink
>  Issue Type: Improvement
>  Components: Type Serialization System
>Reporter: Timo Walther
>Assignee: Timo Walther
>
> Sometimes it is very difficult to provide `TypeInformation` manually, in case 
> some extraction fails or is not available. {{TypeHint}}s should be the 
> preferred way but this methods can ensure correct types.
> I propose to add all built-in Flink types to the {{Types}}. Such as:
> {code}
> Types.POJO(MyPojo.class)
> Types.POJO(Map)
> Types.GENERIC(Object.class)
> Types.TUPLE(TypeInformation, ...)
> Types.MAP(TypeInformation, TypeInformation)
> {code}
> The methods should validate that the returned type is exactly the requested 
> type. And especially in case of POJO should help creating  {{PojoTypeInfo}}.
> Once this is in place, we can deprecate the {{TypeInfoParser}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188200#comment-16188200
 ] 

ASF GitHub Bot commented on FLINK-7452:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142157686
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped 

[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...

2017-10-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/4612#discussion_r142157686
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java ---
@@ -19,56 +19,408 @@
 package org.apache.flink.api.common.typeinfo;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.EnumTypeInfo;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.PojoField;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
- * This class gives access to the type information of the most most common 
types.
+ * This class gives access to the type information of the most common 
types for which Flink
+ * has built-in serializers and comparators.
+ *
+ * In many cases, Flink tries to analyze generic signatures of 
functions to determine return
+ * types automatically. This class is intended for cases where the 
extraction is not possible
+ * (or inefficient) as well as cases where type information has to be 
supplied manually.
+ *
+ * Depending on the API you are using (e.g. Scala API or Table API), 
there might be a more
+ * specialized Types class.
+ *
+ * A more convenient alternative might be a {@link TypeHint}.
+ *
+ * @see TypeInformation#of(Class) specify type information based on a 
class that will be analyzed
+ * @see TypeInformation#of(TypeHint) specify type information based on a 
{@link TypeHint}
  */
 @PublicEvolving
 public class Types {
 
-   public static final BasicTypeInfo STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
-   public static final BasicTypeInfo BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
-   public static final BasicTypeInfo BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
-   public static final BasicTypeInfo SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
-   public static final BasicTypeInfo INT = 
BasicTypeInfo.INT_TYPE_INFO;
-   public static final BasicTypeInfo LONG = 
BasicTypeInfo.LONG_TYPE_INFO;
-   public static final BasicTypeInfo FLOAT = 
BasicTypeInfo.FLOAT_TYPE_INFO;
-   public static final BasicTypeInfo DOUBLE = 
BasicTypeInfo.DOUBLE_TYPE_INFO;
-   public static final BasicTypeInfo DECIMAL = 
BasicTypeInfo.BIG_DEC_TYPE_INFO;
+   /**
+* Returns type information for {@link java.lang.Void}. Does not 
support a null value.
+*/
+   public static final TypeInformation VOID = 
BasicTypeInfo.VOID_TYPE_INFO;
+
+   /**
+* Returns type information for {@link java.lang.String}. Supports a 
null value.
+*/
+   public static final TypeInformation STRING = 
BasicTypeInfo.STRING_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive byte and a
+* wrapped {@link java.lang.Byte}. Does not support a null value.
+*/
+   public static final TypeInformation BYTE = 
BasicTypeInfo.BYTE_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive boolean 
and a
+* wrapped {@link java.lang.Boolean}. Does not support a null value.
+*/
+   public static final TypeInformation BOOLEAN = 
BasicTypeInfo.BOOLEAN_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive short and 
a
+* wrapped {@link java.lang.Short}. Does not support a null value.
+*/
+   public static final TypeInformation SHORT = 
BasicTypeInfo.SHORT_TYPE_INFO;
 
-   public static final SqlTimeTypeInfo SQL_DATE = 
SqlTimeTypeInfo.DATE;
-   public static final SqlTimeTypeInfo SQL_TIME = 
SqlTimeTypeInfo.TIME;
-   public static final SqlTimeTypeInfo SQL_TIMESTAMP = 
SqlTimeTypeInfo.TIMESTAMP;
+   /**
+* Returns type information for both a primitive int and a
+* wrapped {@link java.lang.Integer}. Does not support a null value.
+*/
+   public static final TypeInformation INT = 
BasicTypeInfo.INT_TYPE_INFO;
+
+   /**
+* Returns type information for both a primitive long and a
+   

[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188191#comment-16188191
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142157044
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link JobSubmitHandler}.
+ */
+public class JobSubmitHandlerTest {
+
+   @Test
+   public void testSerializationFailureHandling() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobSubmitRequestBody request = new JobSubmitRequestBody(new 
byte[0]);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway);
+
+   try {
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
+   Assert.fail();
+   } catch (ExecutionException ee) {
+   RestHandlerException rhe = (RestHandlerException) 
ee.getCause();
+
+   Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, 
rhe.getHttpResponseStatus());
+   }
+   }
+
+   @Test
+   public void testSuccessfulJobSubmission() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobGraph job = new JobGraph("testjob");
+   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
--- End diff --

we call get() on the returned 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142157044
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link JobSubmitHandler}.
+ */
+public class JobSubmitHandlerTest {
+
+   @Test
+   public void testSerializationFailureHandling() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobSubmitRequestBody request = new JobSubmitRequestBody(new 
byte[0]);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway);
+
+   try {
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
+   Assert.fail();
+   } catch (ExecutionException ee) {
+   RestHandlerException rhe = (RestHandlerException) 
ee.getCause();
+
+   Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, 
rhe.getHttpResponseStatus());
+   }
+   }
+
+   @Test
+   public void testSuccessfulJobSubmission() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   when(mockGateway.submitJob(any(JobGraph.class), 
any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   JobSubmitHandler handler = new JobSubmitHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   JobGraph job = new JobGraph("testjob");
+   JobSubmitRequestBody request = new JobSubmitRequestBody(job);
+
+   handler.handleRequest(new HandlerRequest<>(request, 
EmptyMessageParameters.getInstance()), mockGateway).get();
--- End diff --

we call get() on the returned future, which will fail fit he job submission 
was not successful. But I will move the get on the next line since it can be 
easily missed.


---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188189#comment-16188189
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156635
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

the current max is 10mb.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188190#comment-16188190
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156728
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link BlobServerPortHandler}.
+ */
+public class BlobServerPortHandlerTest {
+   private static final int PORT = 64;
+
+   @Test
+   public void testPortRetrieval() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   BlobServerPortHandler handler = new BlobServerPortHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   BlobServerPortResponseBody portResponse = 
handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance()), mockGateway).get();
+
+   Assert.assertEquals(PORT, portResponse.port);
+   }
+
+   @Test
+   public void testPortRetrievalFailureHandling() throws Exception {
+
+   }
--- End diff --

jup


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188188#comment-16188188
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156568
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

so that you can actually access (and resolve) them in 
`RestClusterClient#stop()`.


> Create RESTful cluster endpoint
> ---
>
> Key: FLINK-7072
> URL: https://issues.apache.org/jira/browse/FLINK-7072
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> In order to communicate with the cluster from the RESTful client, we have to 
> implement a RESTful cluster endpoint. The endpoint shall support the 
> following operations:
> * List jobs (GET): Get list of all running jobs on the cluster
> * Submit job (POST): Submit a job to the cluster (only supported in session 
> mode)
> * Get job status (GET): Get the status of an executed job (and maybe the 
> JobExecutionResult)
> * Lookup job leader (GET): Gets the JM leader for the given job
> This endpoint will run in session mode alongside the dispatcher/session 
> runner and forward calls to this component which maintains a view on all 
> currently executed jobs.
> In the per-job mode, the endpoint will return only the single running job and 
> the address of the JobManager alongside which it is running. Furthermore, it 
> won't accept job submissions.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156728
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link BlobServerPortHandler}.
+ */
+public class BlobServerPortHandlerTest {
+   private static final int PORT = 64;
+
+   @Test
+   public void testPortRetrieval() throws Exception {
+   DispatcherGateway mockGateway = mock(DispatcherGateway.class);
+   
when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT));
+   GatewayRetriever mockGatewayRetriever = 
mock(GatewayRetriever.class);
+
+   BlobServerPortHandler handler = new BlobServerPortHandler(
+   
CompletableFuture.completedFuture("http://localhost:1234;),
+   mockGatewayRetriever,
+   RpcUtils.INF_TIMEOUT);
+
+   BlobServerPortResponseBody portResponse = 
handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance()), mockGateway).get();
+
+   Assert.assertEquals(PORT, portResponse.port);
+   }
+
+   @Test
+   public void testPortRetrievalFailureHandling() throws Exception {
+
+   }
--- End diff --

jup


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156635
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java
 ---
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+
+/**
+ * Request for submitting a job.
+ *
+ * We currently require the job-jars to be uploaded through the 
blob-server.
+ */
+public final class JobSubmitRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = 
"serializedJobGraph";
+
+   /**
+* The serialized job graph.
+*/
+   @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH)
+   public final byte[] serializedJobGraph;
--- End diff --

the current max is 10mb.


---


[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142156568
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java
 ---
@@ -28,8 +28,8 @@
  */
 public class JobTerminationMessageParameters extends MessageParameters {
 
-   private final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
-   private final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
+   public final JobIDPathParameter jobPathParameter = new 
JobIDPathParameter();
+   public final TerminationModeQueryParameter 
terminationModeQueryParameter = new TerminationModeQueryParameter();
--- End diff --

so that you can actually access (and resolve) them in 
`RestClusterClient#stop()`.


---


[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188182#comment-16188182
 ] 

ASF GitHub Bot commented on FLINK-7072:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142155888
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler 

[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...

2017-10-02 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4742#discussion_r142155888
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
+import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
+import 
org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
+import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link RestClusterClient}.
+ */
+public class RestClusterClientTest {
+
+   private static final String restAddress = "http://localhost:1234;;
+   private static final Dispatcher mockRestfulGateway = 
mock(Dispatcher.class);
+   private static final GatewayRetriever 
mockGatewayRetriever = mock(GatewayRetriever.class);
+
+   static {
+   
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
+   
when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway));
+   }
+
+   @Test
+   public void testABC() throws Exception {
+
+   Configuration config = new Configuration();
+   config.setString(JobManagerOptions.ADDRESS, "localhost");
+
+   RestServerEndpointConfiguration rsec = 
RestServerEndpointConfiguration.fromConfiguration(config);
+
+   TestBlobServerPortHandler portHandler = new 
TestBlobServerPortHandler();
+   TestJobSubmitHandler submitHandler = new TestJobSubmitHandler();
+   TestJobTerminationHandler terminationHandler = new 
TestJobTerminationHandler();
+
+   RestServerEndpoint rse = new RestServerEndpoint(rsec) {
+   @Override
+   protected 

[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188181#comment-16188181
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4509
  
@NicoK , I have fixed the other code issues. 

For UT, I only added one test for verifying  there is no race condition 
between requesting and recycling floating buffers. I am not sure whether to 
control the process by `CountDownLatch` is enough. Or I should submit two 
different threads to execute the process repeated. If this way is ok, I will 
add more tests for race condition along with 
`NetworkBufferPool#createBufferPool()` modifications you suggested. 

For fair distribution of buffers test, I will add one test, maybe in this 
PR or in #4735 later.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4509: [FLINK-7406][network] Implement Netty receiver incoming p...

2017-10-02 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4509
  
@NicoK , I have fixed the other code issues. 

For UT, I only added one test for verifying  there is no race condition 
between requesting and recycling floating buffers. I am not sure whether to 
control the process by `CountDownLatch` is enough. Or I should submit two 
different threads to execute the process repeated. If this way is ok, I will 
add more tests for race condition along with 
`NetworkBufferPool#createBufferPool()` modifications you suggested. 

For fair distribution of buffers test, I will add one test, maybe in this 
PR or in #4735 later.


---


[jira] [Assigned] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint

2017-10-02 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7709:


Assignee: Till Rohrmann

> Port CheckpointStatsDetailsHandler to new REST endpoint
> ---
>
> Key: FLINK-7709
> URL: https://issues.apache.org/jira/browse/FLINK-7709
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188047#comment-16188047
 ] 

ASF GitHub Bot commented on FLINK-7406:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r142142736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -235,14 +240,15 @@ void releaseBuffer() {
ByteBuf write(ByteBufAllocator allocator) throws IOException {
checkNotNull(buffer, "No buffer instance to 
serialize.");
 
-   int length = 16 + 4 + 1 + 4 + buffer.getSize();
+   int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize();
--- End diff --

I think the lengths for the corresponding fields can be seen clearly in the 
below write method.


> Implement Netty receiver incoming pipeline for credit-based
> ---
>
> Key: FLINK-7406
> URL: https://issues.apache.org/jira/browse/FLINK-7406
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> Currently {{PartitionRequestClientHandler}} receives and reads 
> {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} 
> for holding the message. If not got, the message is staged temporarily and 
> {{autoread}} for channel is set false.
> For credit-based mode, {{PartitionRequestClientHandler}} can always get 
> buffer from {{RemoteInputChannel}} for reading messages from producer.
> The related works are:
> * Add the backlog of producer in {{BufferResponse}} message structure
> * {{PartitionRequestClientHandler}} requests buffer from 
> {{RemoteInputChannel}} directly
> * {{PartitionRequestClientHandler}} updates backlog for 
> {{RemoteInputChannel}}, and it may trigger requests of floating buffers from 
> {{BufferPool}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...

2017-10-02 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4509#discussion_r142142736
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
 ---
@@ -235,14 +240,15 @@ void releaseBuffer() {
ByteBuf write(ByteBufAllocator allocator) throws IOException {
checkNotNull(buffer, "No buffer instance to 
serialize.");
 
-   int length = 16 + 4 + 1 + 4 + buffer.getSize();
+   int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize();
--- End diff --

I think the lengths for the corresponding fields can be seen clearly in the 
below write method.


---


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188030#comment-16188030
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142136764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---
@@ -407,4 +419,72 @@ static void closeSilently(Socket socket, Logger LOG) {
private BlobUtils() {
throw new RuntimeException();
}
+
+   /**
+* Moves the temporary incomingFile to its permanent location 
where it is available for
+* use.
+*
+* @param incomingFile
+*  temporary file created during transfer
+* @param jobId
+*  ID of the job this blob belongs to or null if 
job-unrelated
+* @param blobKey
+*  BLOB key identifying the file
+* @param storageFile
+*  (local) file where the blob is/should be stored
+* @param writeLock
+*  lock to acquire before doing the move
+* @param log
+*  logger for debug information
+* @param blobStore
+*  HA store (or null if unavailable)
+*
+* @throws IOException
+*  thrown if an I/O error occurs while moving the file or 
uploading it to the HA store
+*/
+   static void moveTempFileToStore(
+   File incomingFile, @Nullable JobID jobId, BlobKey 
blobKey, File storageFile,
+   Lock writeLock, Logger log, @Nullable BlobStore 
blobStore) throws IOException {
+
+   writeLock.lock();
+
+   try {
+   // first check whether the file already exists
+   if (!storageFile.exists()) {
+   try {
+   // only move the file if it does not 
yet exist
+   Files.move(incomingFile.toPath(), 
storageFile.toPath());
+
+   incomingFile = null;
+
+   } catch (FileAlreadyExistsException ignored) {
+   log.warn("Detected concurrent file 
modifications. This should only happen if multiple" +
+   "BlobServer use the same 
storage directory.");
+   // we cannot be sure at this point 
whether the file has already been uploaded to the blob
+   // store or not. Even if the blobStore 
might shortly be in an inconsistent state, we have
--- End diff --

"we have to"


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188031#comment-16188031
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142128358
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---
@@ -18,89 +18,21 @@
 
 package org.apache.flink.runtime.blob;
 
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The BLOB cache implements a local cache for content-addressable BLOBs.
- *
- * When requesting BLOBs through the {@link BlobCache#getFile} methods, 
the
- * BLOB cache will first attempt to serve the file from its local cache. 
Only if
- * the local cache does not contain the desired BLOB, the BLOB cache will 
try to
- * download it from a distributed file system (if available) or the BLOB
- * server.
+ * The BLOB cache provides access to BLOB services for permanent and 
transient BLOBs.
  */
-public class BlobCache extends TimerTask implements BlobService {
-
-   /** The log object used for debugging. */
-   private static final Logger LOG = 
LoggerFactory.getLogger(BlobCache.class);
-
-   private final InetSocketAddress serverAddress;
-
-   /** Root directory for local file storage */
-   private final File storageDir;
-
-   /** Blob store for distributed file storage, e.g. in HA */
-   private final BlobView blobView;
-
-   private final AtomicBoolean shutdownRequested = new AtomicBoolean();
-
-   /** Shutdown hook thread to ensure deletion of the storage directory. */
-   private final Thread shutdownHook;
-
-   /** The number of retries when the transfer fails */
-   private final int numFetchRetries;
-
-   /** Configuration for the blob client like ssl parameters required to 
connect to the blob server */
-   private final Configuration blobClientConfig;
-
-   // 

-
-   /**
-* Job reference counters with a time-to-live (TTL).
-*/
-   private static class RefCount {
-   /**
-* Number of references to a job.
-*/
-   public int references = 0;
-   
-   /**
-* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
-* non-positive values).
-*/
-   public long keepUntil = -1;
-   }
-
-   /** Map to store the number of references to a specific job */
-   private final Map jobRefCounters = new HashMap<>();
+public class BlobCache implements BlobService {
--- End diff --

Could this be called `BlobServiceImpl`?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188032#comment-16188032
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142138008
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java
 ---
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Provides a cache for permanent BLOB files including a per-job 
ref-counting and a staged cleanup.
+ *
+ * When requesting BLOBs via {@link #getPermanentFile(JobID, BlobKey)}, 
the cache will first attempt to
+ * serve the file from its local cache. Only if the local cache does not 
contain the desired BLOB,
+ * it will try to download it from a distributed HA file system (if 
available) or the BLOB server.
+ *
+ * If files for a job are not needed any more, they will enter a 
staged, i.e. deferred, cleanup.
+ * Files may thus still be be accessible upon recovery and do not need to 
be re-downloaded.
+ */
+public class PermanentBlobCache extends AbstractBlobCache implements 
PermanentBlobService {
+
+   /**
+* Job reference counters with a time-to-live (TTL).
+*/
+   @VisibleForTesting
+   static class RefCount {
+   /**
+* Number of references to a job.
+*/
+   public int references = 0;
+
+   /**
+* Timestamp in milliseconds when any job data should be 
cleaned up (no cleanup for
+* non-positive values).
+*/
+   public long keepUntil = -1;
+   }
+
+   /**
+* Map to store the number of references to a specific job.
+*/
+   private final Map jobRefCounters = new HashMap<>();
+
+   /**
+* Time interval (ms) to run the cleanup task; also used as the default 
TTL.
+*/
+   private final long cleanupInterval;
+
+   private final Timer cleanupTimer;
+
+   /**
+* Instantiates a new cache for permanent BLOBs which are also 
available in an HA store.
+*
+* @param serverAddress
+*  address of the {@link BlobServer} to use for fetching 
files from
+* @param blobClientConfig
+*  global configuration
+* @param blobView
+*  (distributed) HA blob store file system to retrieve 
files from first
+*
+* @throws IOException
+*  thrown if the (local or distributed) file storage 
cannot be created or is not usable
+*/
+   public PermanentBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig,
+   final BlobView blobView) throws IOException {
+
+   super(serverAddress, blobClientConfig, blobView,
+   LoggerFactory.getLogger(PermanentBlobCache.class));
+
+   // Initializing the clean up task
+   this.cleanupTimer = new Timer(true);
+
+   this.cleanupInterval = 
blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000;
+   

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188039#comment-16188039
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142126654
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.blob;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FileUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Abstract base class for permanent and transient BLOB files.
+ */
+public abstract class AbstractBlobCache implements Closeable {
+
+   /**
+* The log object used for debugging.
+*/
+   protected final Logger LOG;
+
+   /**
+* Counter to generate unique names for temporary files.
+*/
+   protected final AtomicLong tempFileCounter = new AtomicLong(0);
+
+   protected final InetSocketAddress serverAddress;
+
+   /**
+* Root directory for local file storage.
+*/
+   protected final File storageDir;
+
+   /**
+* Blob store for distributed file storage, e.g. in HA.
+*/
+   protected final BlobView blobView;
+
+   protected final AtomicBoolean shutdownRequested = new AtomicBoolean();
+
+   /**
+* Shutdown hook thread to ensure deletion of the local storage 
directory.
+*/
+   protected final Thread shutdownHook;
+
+   /**
+* The number of retries when the transfer fails.
+*/
+   protected final int numFetchRetries;
+
+   /**
+* Configuration for the blob client like ssl parameters required to 
connect to the blob
+* server.
+*/
+   protected final Configuration blobClientConfig;
+
+   /**
+* Lock guarding concurrent file accesses.
+*/
+   protected final ReadWriteLock readWriteLock;
+
+   public AbstractBlobCache(
+   final InetSocketAddress serverAddress,
+   final Configuration blobClientConfig,
+   final BlobView blobView,
+   final Logger logger) throws IOException {
+
+   this.LOG = logger;
+
+   this.serverAddress = checkNotNull(serverAddress);
+   this.blobClientConfig = checkNotNull(blobClientConfig);
+   this.blobView = checkNotNull(blobView, "blobStore");
--- End diff --

`blobStore` != `blobView`


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even 

[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188036#comment-16188036
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142138928
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 ---
@@ -468,7 +468,7 @@ class JobManager(
   taskManagerGateway match {
 case x: ActorTaskManagerGateway =>
   handleTaskManagerTerminated(x.getActorGateway().actor(), 
instance.getId)
-case _ => log.debug(s"Cannot remove resource ${resourceID}, 
because there is " +
+case _ => log.debug(s"Cannot remove reosurce ${resourceID}, 
because there is " +
--- End diff --

typo resource


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188026#comment-16188026
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142129505
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress, 
Configuration clientConfig) t
}
}
 
+   /**
+* Downloads the given BLOB from the given server and stores its 
contents to a (local) file.
+*
+* Transient BLOB files are deleted after a successful copy of the 
server's data into the
+* given localJarFile.
+*
+* @param jobId
+*  job ID the BLOB belongs to or null if 
job-unrelated
+* @param blobKey
+*  BLOB key
+* @param localJarFile
+*  the local file to write to
+* @param serverAddress
+*  address of the server to download from
+* @param blobClientConfig
+*  client configuration for the connection
+* @param numFetchRetries
+*  number of retries before failing
+*
+* @throws IOException
+*  if an I/O error occurs during the download
+*/
+   static void downloadFromBlobServer(
+   @Nullable JobID jobId, BlobKey blobKey, File 
localJarFile,
+   InetSocketAddress serverAddress, Configuration 
blobClientConfig, int numFetchRetries)
+   throws IOException {
+
+   final byte[] buf = new byte[BUFFER_SIZE];
+   LOG.info("Downloading {}/{} from {}", jobId, blobKey, 
serverAddress);
+
+   // loop over retries
+   int attempt = 0;
+   while (true) {
+   try (
+   final BlobClient bc = new 
BlobClient(serverAddress, blobClientConfig);
+   final InputStream is = bc.getInternal(jobId, 
blobKey);
+   final OutputStream os = new 
FileOutputStream(localJarFile)
+   ) {
+   while (true) {
+   final int read = is.read(buf);
+   if (read < 0) {
+   break;
+   }
+   os.write(buf, 0, read);
+   }
+
+   return;
+   }
+   catch (Throwable t) {
+   String message = "Failed to fetch BLOB " + 
jobId + "/" + blobKey + " from " + serverAddress +
+   " and store it under " + 
localJarFile.getAbsolutePath();
+   if (attempt < numFetchRetries) {
+   if (LOG.isDebugEnabled()) {
+   LOG.debug(message + " 
Retrying...", t);
--- End diff --

Shouldn't this also be an error if the other log statement is an error as 
well?


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188029#comment-16188029
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142125479
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 ---
@@ -158,9 +151,9 @@ protected void respondAsLeader(final 
ChannelHandlerContext ctx, final Routed rou
cache = blobPortFuture.thenApplyAsync(
(Integer port) -> {
try {
-   return new BlobCache(new 
InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
+   return new 
TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), 
port), config);
} catch (IOException e) {
-   throw new 
FlinkFutureException("Could not create BlobCache.", e);
+   throw new 
FlinkFutureException("Could not create TransientBlobCache.", e);
--- End diff --

When rebasing, then this will be a 
`java.util.concurrent.CompletionException`.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188024#comment-16188024
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142129292
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress, 
Configuration clientConfig) t
}
}
 
+   /**
+* Downloads the given BLOB from the given server and stores its 
contents to a (local) file.
+*
+* Transient BLOB files are deleted after a successful copy of the 
server's data into the
+* given localJarFile.
+*
+* @param jobId
+*  job ID the BLOB belongs to or null if 
job-unrelated
+* @param blobKey
+*  BLOB key
+* @param localJarFile
+*  the local file to write to
+* @param serverAddress
+*  address of the server to download from
+* @param blobClientConfig
+*  client configuration for the connection
+* @param numFetchRetries
+*  number of retries before failing
+*
+* @throws IOException
+*  if an I/O error occurs during the download
+*/
+   static void downloadFromBlobServer(
+   @Nullable JobID jobId, BlobKey blobKey, File 
localJarFile,
+   InetSocketAddress serverAddress, Configuration 
blobClientConfig, int numFetchRetries)
--- End diff --

Wrapping the parameters could be one per line.


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs

2017-10-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188025#comment-16188025
 ] 

ASF GitHub Bot commented on FLINK-7068:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4358#discussion_r142133777
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -375,7 +378,32 @@ public File getFile(BlobKey key) throws IOException {
 *  Thrown if the file retrieval failed.
 */
@Override
-   public File getFile(JobID jobId, BlobKey key) throws IOException {
+   public File getTransientFile(JobID jobId, BlobKey key) throws 
IOException {
+   checkNotNull(jobId);
+   return getFileInternal(jobId, key);
+   }
+
+   /**
+* Returns the path to a local copy of the file associated with the 
provided job ID and blob
+* key.
+* 
+* We will first attempt to serve the BLOB from the local storage. If 
the BLOB is not in
+* there, we will try to download it from the HA store.
+*
+* @param jobId
+*  ID of the job this blob belongs to
+* @param key
+*  blob key associated with the requested file
+*
+* @return The path to the file.
+*
+* @throws java.io.FileNotFoundException
+*  if the BLOB does not exist;
+* @throws IOException
+*  if any other error occurs when retrieving the file
+*/
+   @Override
+   public File getPermanentFile(JobID jobId, BlobKey key) throws 
IOException {
--- End diff --

Same here for `BlobType.PERMANENT`


> change BlobService sub-classes for permanent and transient BLOBs
> 
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, Network
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are 
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. 
> which even does not have to be reflected by files.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >