[jira] [Commented] (FLINK-2013) Create generalized linear model framework
[ https://issues.apache.org/jira/browse/FLINK-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189249#comment-16189249 ] ASF GitHub Bot commented on FLINK-2013: --- Github user mtunique commented on a diff in the pull request: https://github.com/apache/flink/pull/3756#discussion_r142308699 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala --- @@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite val parameters = ParameterMap() -parameters.add(MultipleLinearRegression.Stepsize, 2.0) -parameters.add(MultipleLinearRegression.Iterations, 10) -parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) +parameters.add(WithIterativeSolver.Stepsize, 2.0) +parameters.add(WithIterativeSolver.Iterations, 10) +parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001) mlr.fit(sparseInputDS, parameters) val weightList = mlr.weightsOption.get.collect() val WeightVector(weights, intercept) = weightList.head +println(weightList) --- End diff -- Ok I will delete it. > Create generalized linear model framework > - > > Key: FLINK-2013 > URL: https://issues.apache.org/jira/browse/FLINK-2013 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Theodore Vasiloudis > Labels: ML > > [Generalized linear > models|http://en.wikipedia.org/wiki/Generalized_linear_model] (GLMs) provide > an abstraction for many learning models that can be used for regression and > classification tasks. > Some example GLMs are linear regression, logistic regression, LASSO and ridge > regression. > The goal for this JIRA is to provide interfaces for the set of common > properties and functions these models share. > In order to achieve that, a design pattern similar to the one that > [sklearn|http://scikit-learn.org/stable/modules/linear_model.html] and > [MLlib|http://spark.apache.org/docs/1.3.0/mllib-linear-methods.html] employ > will be used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3756: [FLINK-2013] Create generalized linear model frame...
Github user mtunique commented on a diff in the pull request: https://github.com/apache/flink/pull/3756#discussion_r142308699 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala --- @@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite val parameters = ParameterMap() -parameters.add(MultipleLinearRegression.Stepsize, 2.0) -parameters.add(MultipleLinearRegression.Iterations, 10) -parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) +parameters.add(WithIterativeSolver.Stepsize, 2.0) +parameters.add(WithIterativeSolver.Iterations, 10) +parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001) mlr.fit(sparseInputDS, parameters) val weightList = mlr.weightsOption.get.collect() val WeightVector(weights, intercept) = weightList.head +println(weightList) --- End diff -- Ok I will delete it. ---
[jira] [Commented] (FLINK-2013) Create generalized linear model framework
[ https://issues.apache.org/jira/browse/FLINK-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189241#comment-16189241 ] ASF GitHub Bot commented on FLINK-2013: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3756#discussion_r142307430 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala --- @@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite val parameters = ParameterMap() -parameters.add(MultipleLinearRegression.Stepsize, 2.0) -parameters.add(MultipleLinearRegression.Iterations, 10) -parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) +parameters.add(WithIterativeSolver.Stepsize, 2.0) +parameters.add(WithIterativeSolver.Iterations, 10) +parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001) mlr.fit(sparseInputDS, parameters) val weightList = mlr.weightsOption.get.collect() val WeightVector(weights, intercept) = weightList.head +println(weightList) --- End diff -- remove this? > Create generalized linear model framework > - > > Key: FLINK-2013 > URL: https://issues.apache.org/jira/browse/FLINK-2013 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Theodore Vasiloudis > Labels: ML > > [Generalized linear > models|http://en.wikipedia.org/wiki/Generalized_linear_model] (GLMs) provide > an abstraction for many learning models that can be used for regression and > classification tasks. > Some example GLMs are linear regression, logistic regression, LASSO and ridge > regression. > The goal for this JIRA is to provide interfaces for the set of common > properties and functions these models share. > In order to achieve that, a design pattern similar to the one that > [sklearn|http://scikit-learn.org/stable/modules/linear_model.html] and > [MLlib|http://spark.apache.org/docs/1.3.0/mllib-linear-methods.html] employ > will be used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-2013) Create generalized linear model framework
[ https://issues.apache.org/jira/browse/FLINK-2013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189242#comment-16189242 ] ASF GitHub Bot commented on FLINK-2013: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3756 +1, LGTM > Create generalized linear model framework > - > > Key: FLINK-2013 > URL: https://issues.apache.org/jira/browse/FLINK-2013 > Project: Flink > Issue Type: New Feature > Components: Machine Learning Library >Reporter: Theodore Vasiloudis >Assignee: Theodore Vasiloudis > Labels: ML > > [Generalized linear > models|http://en.wikipedia.org/wiki/Generalized_linear_model] (GLMs) provide > an abstraction for many learning models that can be used for regression and > classification tasks. > Some example GLMs are linear regression, logistic regression, LASSO and ridge > regression. > The goal for this JIRA is to provide interfaces for the set of common > properties and functions these models share. > In order to achieve that, a design pattern similar to the one that > [sklearn|http://scikit-learn.org/stable/modules/linear_model.html] and > [MLlib|http://spark.apache.org/docs/1.3.0/mllib-linear-methods.html] employ > will be used. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #3756: [FLINK-2013] Create generalized linear model framework
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/3756 +1, LGTM ---
[GitHub] flink pull request #3756: [FLINK-2013] Create generalized linear model frame...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/3756#discussion_r142307430 --- Diff: flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala --- @@ -75,15 +75,16 @@ class MultipleLinearRegressionITSuite val parameters = ParameterMap() -parameters.add(MultipleLinearRegression.Stepsize, 2.0) -parameters.add(MultipleLinearRegression.Iterations, 10) -parameters.add(MultipleLinearRegression.ConvergenceThreshold, 0.001) +parameters.add(WithIterativeSolver.Stepsize, 2.0) +parameters.add(WithIterativeSolver.Iterations, 10) +parameters.add(WithIterativeSolver.ConvergenceThreshold, 0.001) mlr.fit(sparseInputDS, parameters) val weightList = mlr.weightsOption.get.collect() val WeightVector(weights, intercept) = weightList.head +println(weightList) --- End diff -- remove this? ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189176#comment-16189176 ] ASF GitHub Bot commented on FLINK-7072: --- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142228578 --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * A deployment descriptor for an existing cluster. + */ +public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor { + + private final Configuration config; + + public Flip6StandaloneClusterDescriptor(Configuration config) { + this.config = config; + } + + @Override + public String getClusterDescription() { + String host = config.getString(JobManagerOptions.ADDRESS, ""); + int port = config.getInteger(JobManagerOptions.PORT, -1); + return "FLIP-6 Standalone cluster at " + host + ":" + port; + } + + @Override + public RestClusterClient retrieve(String applicationID) { + try { + return new RestClusterClient(config); + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve standalone cluster", e); --- End diff -- How about adding a `Flip6ClusterException` to wrap this exception? > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142228578 --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * A deployment descriptor for an existing cluster. + */ +public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor { + + private final Configuration config; + + public Flip6StandaloneClusterDescriptor(Configuration config) { + this.config = config; + } + + @Override + public String getClusterDescription() { + String host = config.getString(JobManagerOptions.ADDRESS, ""); + int port = config.getInteger(JobManagerOptions.PORT, -1); + return "FLIP-6 Standalone cluster at " + host + ":" + port; + } + + @Override + public RestClusterClient retrieve(String applicationID) { + try { + return new RestClusterClient(config); + } catch (Exception e) { + throw new RuntimeException("Couldn't retrieve standalone cluster", e); --- End diff -- How about adding a `Flip6ClusterException` to wrap this exception? ---
[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss
[ https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16189049#comment-16189049 ] Vijay Srinivasaraghavan commented on FLINK-7737: Some observation with respect to the usage of hflush vs hsync. When using HCFS implementation as backed filesystem, only hflush() is invoked since call to hsync() happens only when the FSDataOutputStream is instance of HdfsDataOutputStream. Due to this fact, we are seeing some data loss when the bucketing sink is holding data in pending state and trying to close the stream (as part of TM failover recovery). I do not see any issue in adding another condition to include hsync() call for HCFS types (FSDataOutputStream). [~rmetzger] Could you please take a look? hflush() - This API flushes all outstanding data (i.e. the current unfinished packet) from the client into the OS buffers on all DataNode replicas. hsync() - This API flushes the data to the DataNodes, like hflush(), but should also force the data to underlying physical storage via fsync (or equivalent). https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java > On HCFS systems, FSDataOutputStream does not issue hsync only hflush which > leads to data loss > - > > Key: FLINK-7737 > URL: https://issues.apache.org/jira/browse/FLINK-7737 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.3.2 > Environment: Dev >Reporter: Ryan Hobbs > > During several tests where we simulated failure conditions, we have observed > that on HCFS systems where the data stream is of type FSDataOutputStream, > Flink will issue hflush() and not hsync() which results in data loss. > In the class *StreamWriterBase.java* the code below will execute hsync if the > output stream is of type *HdfsDataOutputStream* but not for streams of type > *FSDataOutputStream*. Is this by design? > {code} > protected void hflushOrSync(FSDataOutputStream os) throws IOException { > try { > // At this point the refHflushOrSync cannot be null, > // since register method would have thrown if it was. > this.refHflushOrSync.invoke(os); > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } > } catch (InvocationTargetException e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e.getCause()); > Throwable cause = e.getCause(); > if (cause != null && cause instanceof IOException) { > throw (IOException) cause; > } > throw new RuntimeException(msg, e); > } catch (Exception e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e); > throw new RuntimeException(msg, e); > } > } > {code} > Could a potential fix me to perform a sync even on streams of type > *FSDataOutputStream*? > {code} > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } else if (os instanceof FSDataOutputStream) { > os.hsync(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188860#comment-16188860 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142258814 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private RestClusterClientConfiguration configuration; + private final RestClient restEndpoint; + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.configuration = configuration; + this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + } + + @Override + public void shutdown() { + this.restEndpoint.shutdown(Time.seconds(5)); + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + log.info("Submitting job."); + try { + // temporary hack for FLIP-6 + jobGraph.setAllowQueuedScheduling(true); + submitJob(jobGraph); + } catch (JobSubmissionException e) { + throw new RuntimeException(e); + } + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + + // TOOD: do not exit this method until job is finished + return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + } + + private void
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188861#comment-16188861 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142258856 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private RestClusterClientConfiguration configuration; + private final RestClient restEndpoint; + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.configuration = configuration; + this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + } + + @Override + public void shutdown() { + this.restEndpoint.shutdown(Time.seconds(5)); + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + log.info("Submitting job."); + try { + // temporary hack for FLIP-6 + jobGraph.setAllowQueuedScheduling(true); + submitJob(jobGraph); + } catch (JobSubmissionException e) { + throw new RuntimeException(e); + } + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + + // TOOD: do not exit this method until job is finished + return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + } + + private void
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142258856 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private RestClusterClientConfiguration configuration; + private final RestClient restEndpoint; + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.configuration = configuration; + this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + } + + @Override + public void shutdown() { + this.restEndpoint.shutdown(Time.seconds(5)); + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + log.info("Submitting job."); + try { + // temporary hack for FLIP-6 + jobGraph.setAllowQueuedScheduling(true); + submitJob(jobGraph); + } catch (JobSubmissionException e) { + throw new RuntimeException(e); + } + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + + // TOOD: do not exit this method until job is finished + return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + } + + private void submitJob(JobGraph jobGraph) throws JobSubmissionException { + log.info("Requesting blob server port."); + int blobServerPort; + try { + CompletableFuture portFuture =
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142258814 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private RestClusterClientConfiguration configuration; + private final RestClient restEndpoint; + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.configuration = configuration; + this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + } + + @Override + public void shutdown() { + this.restEndpoint.shutdown(Time.seconds(5)); + } + + @Override + protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { + log.info("Submitting job."); + try { + // temporary hack for FLIP-6 + jobGraph.setAllowQueuedScheduling(true); + submitJob(jobGraph); + } catch (JobSubmissionException e) { + throw new RuntimeException(e); + } + // don't return just a JobSubmissionResult here, the signature is lying + // The CliFrontend expects this to be a JobExecutionResult + + // TOOD: do not exit this method until job is finished + return new JobExecutionResult(jobGraph.getJobID(), 1, Collections.emptyMap()); + } + + private void submitJob(JobGraph jobGraph) throws JobSubmissionException { + log.info("Requesting blob server port."); + int blobServerPort; + try { + CompletableFuture portFuture =
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188850#comment-16188850 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142257641 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private RestClusterClientConfiguration configuration; + private final RestClient restEndpoint; + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.configuration = configuration; + this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + } + + @Override + public void shutdown() { + this.restEndpoint.shutdown(Time.seconds(5)); --- End diff -- We don't need to call super. The ClusterClient implementation shuts down the HA services/actorSystemLoeader, which we don't use. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142257641 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ProgramInvocationException; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.blob.BlobClient; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.client.JobSubmissionException; +import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; + +import java.net.InetSocketAddress; +import java.net.URL; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; + +/** + * A {@link ClusterClient} implementation that communicates via HTTP REST requests. + */ +public class RestClusterClient extends ClusterClient { + + private RestClusterClientConfiguration configuration; + private final RestClient restEndpoint; + + public RestClusterClient(Configuration config) throws Exception { + this(config, RestClusterClientConfiguration.fromConfiguration(config)); + } + + public RestClusterClient(Configuration config, RestClusterClientConfiguration configuration) throws Exception { + super(config); + this.configuration = configuration; + this.restEndpoint = new RestClient(configuration.getRestEndpointConfiguration(), Executors.newFixedThreadPool(4)); + } + + @Override + public void shutdown() { + this.restEndpoint.shutdown(Time.seconds(5)); --- End diff -- We don't need to call super. The ClusterClient implementation shuts down the HA services/actorSystemLoeader, which we don't use. ---
[jira] [Commented] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields
[ https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188843#comment-16188843 ] ASF GitHub Bot commented on FLINK-7371: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142254410 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala --- @@ -85,6 +85,46 @@ class OverWindowITCase extends StreamingWithStateTestBase { } @Test + def testOverWindowWithConstant(): Unit = { + +val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +StreamITCase.clear +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) +val weightAvgFun = new WeightedAvg + +val windowedTable = table + .window( +Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w) + .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg) + .select('c, 'wAvg) --- End diff -- can be removed > user defined aggregator assumes nr of arguments smaller or equal than number > of row fields > -- > > Key: FLINK-7371 > URL: https://issues.apache.org/jira/browse/FLINK-7371 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Stefano Bortoli >Assignee: Timo Walther > > The definition of user define aggregations with a number of parameters larger > than the row fields causes ArrayIndexOutOfBoundsException because the > indexing is based on a linear iteration over row fields. This does not > consider cases where fields can be used more than once and constant values > are passed to the aggregation function. > for example: > {code} > window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW > aggs [myAgg($0, $1, $3, $0, $4)]) > {code} > where $3 and $4 are reference to constants, and $0 and $1 are fields causes: > {code} > java.lang.ArrayIndexOutOfBoundsException: 4 > at > org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134) > at > org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147) > at > org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields
[ https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188842#comment-16188842 ] ASF GitHub Bot commented on FLINK-7371: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142256742 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) { } // overloaded accumulate method + // dummy to test constants + public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight, int x, String string) { + accumulator.sum += iWeight + Integer.parseInt(string); --- End diff -- change the method to ``` accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight; accumulator.count += iWeight; ``` to have some influence of the value of `string` in the result? > user defined aggregator assumes nr of arguments smaller or equal than number > of row fields > -- > > Key: FLINK-7371 > URL: https://issues.apache.org/jira/browse/FLINK-7371 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Stefano Bortoli >Assignee: Timo Walther > > The definition of user define aggregations with a number of parameters larger > than the row fields causes ArrayIndexOutOfBoundsException because the > indexing is based on a linear iteration over row fields. This does not > consider cases where fields can be used more than once and constant values > are passed to the aggregation function. > for example: > {code} > window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW > aggs [myAgg($0, $1, $3, $0, $4)]) > {code} > where $3 and $4 are reference to constants, and $0 and $1 are fields causes: > {code} > java.lang.ArrayIndexOutOfBoundsException: 4 > at > org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134) > at > org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147) > at > org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields
[ https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188844#comment-16188844 ] ASF GitHub Bot commented on FLINK-7371: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142247401 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1670,4 +1670,34 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable constant to the member area of the generated [[Function]]. +* +* @param constant constant expression +* @return member variable term +*/ + def addReusableBoxedConstant(constant: GeneratedExpression): String = { +require(constant.literal, "Literal expected") + +val fieldTerm = newName("constant") + +val boxed = generateOutputFieldBoxing(constant) +val boxedType = boxedTypeTermForTypeInfo(boxed.resultType) + +val field = + s""" +|transient $boxedType $fieldTerm; --- End diff -- why `transient`? Couldn't this be `final`? > user defined aggregator assumes nr of arguments smaller or equal than number > of row fields > -- > > Key: FLINK-7371 > URL: https://issues.apache.org/jira/browse/FLINK-7371 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.1 >Reporter: Stefano Bortoli >Assignee: Timo Walther > > The definition of user define aggregations with a number of parameters larger > than the row fields causes ArrayIndexOutOfBoundsException because the > indexing is based on a linear iteration over row fields. This does not > consider cases where fields can be used more than once and constant values > are passed to the aggregation function. > for example: > {code} > window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW > aggs [myAgg($0, $1, $3, $0, $4)]) > {code} > where $3 and $4 are reference to constants, and $0 and $1 are fields causes: > {code} > java.lang.ArrayIndexOutOfBoundsException: 4 > at > org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134) > at > org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147) > at > org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at > org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147) > at > org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142254410 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala --- @@ -85,6 +85,46 @@ class OverWindowITCase extends StreamingWithStateTestBase { } @Test + def testOverWindowWithConstant(): Unit = { + +val data = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (3L, 3, "Hello"), + (4L, 4, "Hello"), + (5L, 5, "Hello"), + (6L, 6, "Hello"), + (7L, 7, "Hello World"), + (8L, 8, "Hello World"), + (8L, 8, "Hello World"), + (20L, 20, "Hello World")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setParallelism(1) +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +StreamITCase.clear +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) +val weightAvgFun = new WeightedAvg + +val windowedTable = table + .window( +Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w) + .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg) + .select('c, 'wAvg) --- End diff -- can be removed ---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142247401 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1670,4 +1670,34 @@ abstract class CodeGenerator( fieldTerm } + + /** +* Adds a reusable constant to the member area of the generated [[Function]]. +* +* @param constant constant expression +* @return member variable term +*/ + def addReusableBoxedConstant(constant: GeneratedExpression): String = { +require(constant.literal, "Literal expected") + +val fieldTerm = newName("constant") + +val boxed = generateOutputFieldBoxing(constant) +val boxedType = boxedTypeTermForTypeInfo(boxed.resultType) + +val field = + s""" +|transient $boxedType $fieldTerm; --- End diff -- why `transient`? Couldn't this be `final`? ---
[GitHub] flink pull request #4736: [FLINK-7371] [table] Add support for constant para...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4736#discussion_r142256742 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java --- @@ -86,6 +86,13 @@ public Long getValue(WeightedAvgAccum accumulator) { } // overloaded accumulate method + // dummy to test constants + public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight, int x, String string) { + accumulator.sum += iWeight + Integer.parseInt(string); --- End diff -- change the method to ``` accumulator.sum += (iValue + Integer.parseInt(string)) * iWeight; accumulator.count += iWeight; ``` to have some influence of the value of `string` in the result? ---
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142255730 --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * A deployment descriptor for an existing cluster. + */ +public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor { + + private final Configuration config; + + public Flip6StandaloneClusterDescriptor(Configuration config) { + this.config = config; + } + + @Override + public String getClusterDescription() { + String host = config.getString(JobManagerOptions.ADDRESS, ""); + int port = config.getInteger(JobManagerOptions.PORT, -1); + return "FLIP-6 Standalone cluster at " + host + ":" + port; + } + + @Override + public RestClusterClient retrieve(String applicationID) { --- End diff -- it's always null. ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188830#comment-16188830 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142255730 --- Diff: flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.deployment; + +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; + +/** + * A deployment descriptor for an existing cluster. + */ +public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor { + + private final Configuration config; + + public Flip6StandaloneClusterDescriptor(Configuration config) { + this.config = config; + } + + @Override + public String getClusterDescription() { + String host = config.getString(JobManagerOptions.ADDRESS, ""); + int port = config.getInteger(JobManagerOptions.PORT, -1); + return "FLIP-6 Standalone cluster at " + host + ":" + port; + } + + @Override + public RestClusterClient retrieve(String applicationID) { --- End diff -- it's always null. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188804#comment-16188804 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142251500 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); + TestJobTerminationHandler
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142251500 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); + TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler(); + + RestServerEndpoint rse = new RestServerEndpoint(rsec) { + @Override + protected
[jira] [Commented] (FLINK-7695) Port JobConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188767#comment-16188767 ] ASF GitHub Bot commented on FLINK-7695: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4737 > Port JobConfigHandler to new REST endpoint > -- > > Key: FLINK-7695 > URL: https://issues.apache.org/jira/browse/FLINK-7695 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
[ https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188768#comment-16188768 ] ASF GitHub Bot commented on FLINK-7668: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4728 > Add AccessExecutionGraph refresh interval to ExecutionGraphHolder > - > > Key: FLINK-7668 > URL: https://issues.apache.org/jira/browse/FLINK-7668 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Once we support offline {{AccessExecutionGraph}} implementation (see > FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} > after which the {{AccessExecutionGraph}} is retrieved again from the > {{JobMaster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7668) Add AccessExecutionGraph refresh interval to ExecutionGraphHolder
[ https://issues.apache.org/jira/browse/FLINK-7668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7668. Resolution: Fixed Fix Version/s: 1.4.0 Added via aae417f113d0f9db3ac3b4cbadd378134f30b440 > Add AccessExecutionGraph refresh interval to ExecutionGraphHolder > - > > Key: FLINK-7668 > URL: https://issues.apache.org/jira/browse/FLINK-7668 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Once we support offline {{AccessExecutionGraph}} implementation (see > FLINK-7667) we should add a refresh interval to the {{ExecutionGraphHolder}} > after which the {{AccessExecutionGraph}} is retrieved again from the > {{JobMaster}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4728: [FLINK-7668] Add ExecutionGraphCache for Execution...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4728 ---
[jira] [Commented] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188766#comment-16188766 ] ASF GitHub Bot commented on FLINK-7710: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4750 > Port CheckpointStatsHandler to new REST endpoint > > > Key: FLINK-7710 > URL: https://issues.apache.org/jira/browse/FLINK-7710 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7708) Port CheckpointConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188769#comment-16188769 ] ASF GitHub Bot commented on FLINK-7708: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4744 > Port CheckpointConfigHandler to new REST endpoint > - > > Key: FLINK-7708 > URL: https://issues.apache.org/jira/browse/FLINK-7708 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointConfigHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4744: [FLINK-7708] [flip6] Add CheckpointConfigHandler f...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4744 ---
[GitHub] flink pull request #4750: [FLINK-7710] [flip6] Add CheckpointStatisticsHandl...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4750 ---
[GitHub] flink pull request #4737: [FLINK-7695] [flip6] Add JobConfigHandler for new ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4737 ---
[jira] [Closed] (FLINK-7695) Port JobConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-7695. Resolution: Fixed Fix Version/s: 1.4.0 Added via 172a64c1488bd6edda97473562c6871ae7f3364d > Port JobConfigHandler to new REST endpoint > -- > > Key: FLINK-7695 > URL: https://issues.apache.org/jira/browse/FLINK-7695 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > Fix For: 1.4.0 > > > Port the {{JobConfigHandler}} to the new {{RestServerEndpoint}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7710) Port CheckpointStatsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7710. -- Resolution: Fixed Added via ac82becd21b7766c18d16abfc7e08334c644507e > Port CheckpointStatsHandler to new REST endpoint > > > Key: FLINK-7710 > URL: https://issues.apache.org/jira/browse/FLINK-7710 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7708) Port CheckpointConfigHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7708. -- Resolution: Fixed Added via b41f5a66cd6d62bf3c271f1d0bf9d8fa50a5d410 > Port CheckpointConfigHandler to new REST endpoint > - > > Key: FLINK-7708 > URL: https://issues.apache.org/jira/browse/FLINK-7708 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointConfigHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly
[ https://issues.apache.org/jira/browse/FLINK-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188727#comment-16188727 ] ASF GitHub Bot commented on FLINK-7678: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4726#discussion_r142238899 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala --- @@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { "Nullable(f0)", "Nullable(f0)", "42") + +// test row type input +testAllApis( + Func19('f11), + "Func19(f11)", + "Func19(f11)", + "12,true,1,2,3" --- End diff -- is the result of a scalar function automatically flattened? > SQL UserDefineTableFunction does not take CompositeType input correctly > --- > > Key: FLINK-7678 > URL: https://issues.apache.org/jira/browse/FLINK-7678 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Rong Rong >Assignee: Timo Walther > > UDF is using FlinkTypeFactory to infer operand type while UDTF does not go > through the same code path. This result in: > {code:console} > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 1, column 38 to line 1, column 44: No match found for function signature > func() > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 38 to line 1, column 44: No match found for function signature > func( ) > {code} > Please see github code for more info: > https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly
[ https://issues.apache.org/jira/browse/FLINK-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188729#comment-16188729 ] ASF GitHub Bot commented on FLINK-7678: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4726#discussion_r142238516 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -210,6 +213,31 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testRowType(): Unit = { +val row = Row.of( + 12.asInstanceOf[Integer], + true.asInstanceOf[JBoolean], + Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer]) +) + +val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT)) +val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c) + +val tableFunc4 = new TableFunc4() +val result = in + .join(tableFunc4('c) as ('f0, 'f1, 'f2)) + .select('c, 'f2) + +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,2,3,3", --- End diff -- is this the correct result? Shouldn't `'c` remain nested? We did not ask to flatten it. > SQL UserDefineTableFunction does not take CompositeType input correctly > --- > > Key: FLINK-7678 > URL: https://issues.apache.org/jira/browse/FLINK-7678 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Rong Rong >Assignee: Timo Walther > > UDF is using FlinkTypeFactory to infer operand type while UDTF does not go > through the same code path. This result in: > {code:console} > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 1, column 38 to line 1, column 44: No match found for function signature > func() > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 38 to line 1, column 44: No match found for function signature > func( ) > {code} > Please see github code for more info: > https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7678) SQL UserDefineTableFunction does not take CompositeType input correctly
[ https://issues.apache.org/jira/browse/FLINK-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188728#comment-16188728 ] ASF GitHub Bot commented on FLINK-7678: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4726#discussion_r142237447 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{TableEnvironment, Types} --- End diff -- Undo? > SQL UserDefineTableFunction does not take CompositeType input correctly > --- > > Key: FLINK-7678 > URL: https://issues.apache.org/jira/browse/FLINK-7678 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Rong Rong >Assignee: Timo Walther > > UDF is using FlinkTypeFactory to infer operand type while UDTF does not go > through the same code path. This result in: > {code:console} > org.apache.flink.table.api.ValidationException: SQL validation failed. From > line 1, column 38 to line 1, column 44: No match found for function signature > func() > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 38 to line 1, column 44: No match found for function signature > func( ) > {code} > Please see github code for more info: > https://github.com/walterddr/flink/blob/bug_report/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/UDTFCompositeTypeTestFailure.scala -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4726#discussion_r142237447 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala --- @@ -22,7 +22,7 @@ import java.sql.{Date, Timestamp} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.{TableEnvironment, Types} --- End diff -- Undo? ---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4726#discussion_r142238899 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala --- @@ -89,6 +90,14 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { "Nullable(f0)", "Nullable(f0)", "42") + +// test row type input +testAllApis( + Func19('f11), + "Func19(f11)", + "Func19(f11)", + "12,true,1,2,3" --- End diff -- is the result of a scalar function automatically flattened? ---
[GitHub] flink pull request #4726: [FLINK-7678] [table] Support composite inputs for ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4726#discussion_r142238516 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala --- @@ -210,6 +213,31 @@ class CorrelateITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testRowType(): Unit = { +val row = Row.of( + 12.asInstanceOf[Integer], + true.asInstanceOf[JBoolean], + Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer]) +) + +val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT)) +val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c) + +val tableFunc4 = new TableFunc4() +val result = in + .join(tableFunc4('c) as ('f0, 'f1, 'f2)) + .select('c, 'f2) + +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = mutable.MutableList( + "1,2,3,3", --- End diff -- is this the correct result? Shouldn't `'c` remain nested? We did not ask to flatten it. ---
[jira] [Created] (FLINK-7755) Null values are not correctly handled by batch inner and outer joins
Fabian Hueske created FLINK-7755: Summary: Null values are not correctly handled by batch inner and outer joins Key: FLINK-7755 URL: https://issues.apache.org/jira/browse/FLINK-7755 Project: Flink Issue Type: Bug Components: Table API & SQL Affects Versions: 1.3.2, 1.4.0 Reporter: Fabian Hueske Priority: Blocker Fix For: 1.4.0, 1.3.3 Join predicates of batch joins are not correctly evaluated according to three-value logic. This affects inner as well as outer joins. The problem is that some equality predicates are only evaluated by the internal join algorithms of Flink which are based on {{TypeComparator}}. The field {{TypeComparator}} for {{Row}} are implemented such that {{null == null}} results in {{TRUE}} to ensure correct ordering and grouping. However, three-value logic requires that {{null == null}} results to {{UNKNOWN}} (or null). The code generator implements this logic correctly, but for equality predicates, no code is generated. For outer joins, the problem is a bit tricker because these do not support code-generated predicates yet (see FLINK-5520). FLINK-5498 proposes a solution for this issue. We also need to extend several of the existing tests and add null values to ensure that the join logic is correctly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7754) Complete termination future after actor has been stopped.
Till Rohrmann created FLINK-7754: Summary: Complete termination future after actor has been stopped. Key: FLINK-7754 URL: https://issues.apache.org/jira/browse/FLINK-7754 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann At the moment, we complete the termination future when the {{postStop}} method of the {{RpcActor}} has been executed. This, however, does not mean that the underlying actor has been stopped. We should rather complete the future in the {{AkkaRpcService#stopServer}} method where we close the actor with a graceful shutdown. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7753) HandlerUtils should close the channel on error responses
Eron Wright created FLINK-7753: --- Summary: HandlerUtils should close the channel on error responses Key: FLINK-7753 URL: https://issues.apache.org/jira/browse/FLINK-7753 Project: Flink Issue Type: Sub-task Reporter: Eron Wright Assignee: Eron Wright Priority: Minor Unexpected errors in the server pipeline correctly cause a 500 error response. I suggest that such responses also close the channel rather than allowing keep-alive. This would be a better security posture too since we don't know if the pipeline is corrupt following an unexpected error. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7752) RedirectHandler should execute on the IO thread
Eron Wright created FLINK-7752: --- Summary: RedirectHandler should execute on the IO thread Key: FLINK-7752 URL: https://issues.apache.org/jira/browse/FLINK-7752 Project: Flink Issue Type: Sub-task Reporter: Eron Wright Priority: Minor The redirect handler executes much of its logic (including 'respondAsLeader') on an arbitrary thread, but it would be cleaner to execute on the I/O thread by using `channelHandlerContext.executor()`. Note that Netty allows writes on any thread but reads on the I/O thread. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188511#comment-16188511 ] ASF GitHub Bot commented on FLINK-7709: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4763 [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint ## What is the purpose of the change Adds the new `CheckpointStatisticDetailsHandler` for the new REST server endpoint. Moreover, this PR disables the `FAIL_ON_MISSING_CREATOR_PROPERTIES` property for the `RestMapperUtils.getStrictObjectMapper` because that is something the individuals beans can do on their own (e.g. by checking with `Preconditions.checkNotNull`). R @zentol because of the changes to the `ObjectMapper` setup. ## Verifying this change This change is already covered by existing tests, such as `CheckpointingStatisticsTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portCheckpointStatsDetailsHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4763.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4763 commit c09e579b26752f2ae477d60e8fbb6ccdce315df9 Author: Till RohrmannDate: 2017-09-25T13:29:59Z [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after a given time to live period. This will trigger requesting the AccessExecutionGraph again and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST handlers. In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which have exceeded their time to live. Currently it is set to 20 * refreshInterval of the web gui. This closes #4728. commit a4f9ef81c02738b40cf0ba375650684b46f5417d Author: Till Rohrmann Date: 2017-09-26T16:39:15Z [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint This closes #4737. commit 4259fcc96c7c72644806941bd0df9f508a2f0bcd Author: Till Rohrmann Date: 2017-09-28T16:35:50Z [FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint This commit implements the CheckpointConfigHandler which now returns a CheckpointConfigInfo object if checkpointing is enabled. In case that checkpointing is disabled for a job, it will return a 404 response. commit 6f1756b541ad76521c7b653f834c1032c623f1e6 Author: Till Rohrmann Date: 2017-09-29T13:09:06Z [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint This commit also makes the CheckpointStatsHistory object serializable by removing the CheckpointStatsHistoryIterable and replacing it with a static ArrayList. commit bd8109b0af1a90fc32f76e57261a252762d678eb Author: Till Rohrmann Date: 2017-10-02T17:39:38Z [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint commit 4d694411e9e4b6c508c258655c9c69cb26ddb6be Author: Till Rohrmann Date: 2017-10-02T17:52:13Z Disable failing when not all creator properties are known > Port CheckpointStatsDetailsHandler to new REST endpoint > --- > > Key: FLINK-7709 > URL: https://issues.apache.org/jira/browse/FLINK-7709 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 >
[GitHub] flink pull request #4763: [FLINK-7709] Add CheckpointStatisticDetailsHandler...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4763 [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint ## What is the purpose of the change Adds the new `CheckpointStatisticDetailsHandler` for the new REST server endpoint. Moreover, this PR disables the `FAIL_ON_MISSING_CREATOR_PROPERTIES` property for the `RestMapperUtils.getStrictObjectMapper` because that is something the individuals beans can do on their own (e.g. by checking with `Preconditions.checkNotNull`). R @zentol because of the changes to the `ObjectMapper` setup. ## Verifying this change This change is already covered by existing tests, such as `CheckpointingStatisticsTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink portCheckpointStatsDetailsHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4763.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4763 commit c09e579b26752f2ae477d60e8fbb6ccdce315df9 Author: Till RohrmannDate: 2017-09-25T13:29:59Z [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after a given time to live period. This will trigger requesting the AccessExecutionGraph again and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST handlers. In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which have exceeded their time to live. Currently it is set to 20 * refreshInterval of the web gui. This closes #4728. commit a4f9ef81c02738b40cf0ba375650684b46f5417d Author: Till Rohrmann Date: 2017-09-26T16:39:15Z [FLINK-7695] [flip6] Add JobConfigHandler for new RestServerEndpoint This closes #4737. commit 4259fcc96c7c72644806941bd0df9f508a2f0bcd Author: Till Rohrmann Date: 2017-09-28T16:35:50Z [FLINK-7708] [flip6] Add CheckpointConfigHandler for new REST endpoint This commit implements the CheckpointConfigHandler which now returns a CheckpointConfigInfo object if checkpointing is enabled. In case that checkpointing is disabled for a job, it will return a 404 response. commit 6f1756b541ad76521c7b653f834c1032c623f1e6 Author: Till Rohrmann Date: 2017-09-29T13:09:06Z [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint This commit also makes the CheckpointStatsHistory object serializable by removing the CheckpointStatsHistoryIterable and replacing it with a static ArrayList. commit bd8109b0af1a90fc32f76e57261a252762d678eb Author: Till Rohrmann Date: 2017-10-02T17:39:38Z [FLINK-7709] Add CheckpointStatisticDetailsHandler for new REST endpoint commit 4d694411e9e4b6c508c258655c9c69cb26ddb6be Author: Till Rohrmann Date: 2017-10-02T17:52:13Z Disable failing when not all creator properties are known ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188491#comment-16188491 ] ASF GitHub Bot commented on FLINK-7072: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142207196 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); +
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142207196 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); + TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler(); + + RestServerEndpoint rse = new RestServerEndpoint(rsec) { + @Override + protected
[GitHub] flink pull request #4732: [FLINK-7426] [table] Support null values in keys
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4732#discussion_r142165496 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Null-aware key selector. + */ +class CRowKeySelector( +val keyFields: Array[Int], +@transient var returnType: TypeInformation[Row]) + extends KeySelector[CRow, Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: CRow): Row = { +val row = value.row +val fields = keyFields + +val newKey = new Row(fields.length) +var i = 0 +while (i < fields.length) { + newKey.setField(i, row.getField(fields(i))) --- End diff -- should we add a `project(int[])` method to `Row` similar to `Row.copy()` to reduce the number of method calls? ---
[jira] [Commented] (FLINK-7426) Table API does not support null values in keys
[ https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188421#comment-16188421 ] ASF GitHub Bot commented on FLINK-7426: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4732#discussion_r142167389 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Null-aware key selector. + */ +class CRowKeySelector( +val keyFields: Array[Int], +@transient var returnType: TypeInformation[Row]) + extends KeySelector[CRow, Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: CRow): Row = { +val row = value.row +val fields = keyFields + +val newKey = new Row(fields.length) +var i = 0 +while (i < fields.length) { + newKey.setField(i, row.getField(fields(i))) --- End diff -- if we do that, we can also make `Row.copy()` more efficient by adding a private constructor that receives an `Object[]` and implementing `copy()` as `return new Row(Arrays.copyOf(row.fields, row.fields.length));` > Table API does not support null values in keys > -- > > Key: FLINK-7426 > URL: https://issues.apache.org/jira/browse/FLINK-7426 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Timo Walther >Assignee: Timo Walther > > The Table API uses {{keyBy}} internally, however, the generated > {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not > able to serialize null values. This causes issues during checkpointing or > when using the RocksDB state backend. We need to replace all {{keyBy}} calls > with a custom {{RowKeySelector}}. > {code} > class AggregateITCase extends StreamingWithStateTestBase { > private val queryConfig = new StreamQueryConfig() > queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) > @Test > def testDistinct(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(getStateBackend) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.clear > val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) > .select('b, Null(Types.LONG)).distinct() > val results = t.toRetractStream[Row](queryConfig) > results.addSink(new StreamITCase.RetractingSink).setParallelism(1) > env.execute() > val expected = mutable.MutableList("1,null", "2,null", "3,null", > "4,null", "5,null", "6,null") > assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7426) Table API does not support null values in keys
[ https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188419#comment-16188419 ] ASF GitHub Bot commented on FLINK-7426: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4732#discussion_r142164921 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Null-aware key selector. + */ +class CRowKeySelector( +val keyFields: Array[Int], +@transient var returnType: TypeInformation[Row]) + extends KeySelector[CRow, Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: CRow): Row = { +val row = value.row +val fields = keyFields --- End diff -- Why not use `keyFields` directly? > Table API does not support null values in keys > -- > > Key: FLINK-7426 > URL: https://issues.apache.org/jira/browse/FLINK-7426 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Timo Walther >Assignee: Timo Walther > > The Table API uses {{keyBy}} internally, however, the generated > {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not > able to serialize null values. This causes issues during checkpointing or > when using the RocksDB state backend. We need to replace all {{keyBy}} calls > with a custom {{RowKeySelector}}. > {code} > class AggregateITCase extends StreamingWithStateTestBase { > private val queryConfig = new StreamQueryConfig() > queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) > @Test > def testDistinct(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(getStateBackend) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.clear > val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) > .select('b, Null(Types.LONG)).distinct() > val results = t.toRetractStream[Row](queryConfig) > results.addSink(new StreamITCase.RetractingSink).setParallelism(1) > env.execute() > val expected = mutable.MutableList("1,null", "2,null", "3,null", > "4,null", "5,null", "6,null") > assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7426) Table API does not support null values in keys
[ https://issues.apache.org/jira/browse/FLINK-7426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188420#comment-16188420 ] ASF GitHub Bot commented on FLINK-7426: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4732#discussion_r142165496 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Null-aware key selector. + */ +class CRowKeySelector( +val keyFields: Array[Int], +@transient var returnType: TypeInformation[Row]) + extends KeySelector[CRow, Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: CRow): Row = { +val row = value.row +val fields = keyFields + +val newKey = new Row(fields.length) +var i = 0 +while (i < fields.length) { + newKey.setField(i, row.getField(fields(i))) --- End diff -- should we add a `project(int[])` method to `Row` similar to `Row.copy()` to reduce the number of method calls? > Table API does not support null values in keys > -- > > Key: FLINK-7426 > URL: https://issues.apache.org/jira/browse/FLINK-7426 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.3.2 >Reporter: Timo Walther >Assignee: Timo Walther > > The Table API uses {{keyBy}} internally, however, the generated > {{KeySelector}} uses instances of {{Tuple}}. The {{TupleSerializer}} is not > able to serialize null values. This causes issues during checkpointing or > when using the RocksDB state backend. We need to replace all {{keyBy}} calls > with a custom {{RowKeySelector}}. > {code} > class AggregateITCase extends StreamingWithStateTestBase { > private val queryConfig = new StreamQueryConfig() > queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) > @Test > def testDistinct(): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.setStateBackend(getStateBackend) > val tEnv = TableEnvironment.getTableEnvironment(env) > StreamITCase.clear > val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) > .select('b, Null(Types.LONG)).distinct() > val results = t.toRetractStream[Row](queryConfig) > results.addSink(new StreamITCase.RetractingSink).setParallelism(1) > env.execute() > val expected = mutable.MutableList("1,null", "2,null", "3,null", > "4,null", "5,null", "6,null") > assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4732: [FLINK-7426] [table] Support null values in keys
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4732#discussion_r142164921 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Null-aware key selector. + */ +class CRowKeySelector( +val keyFields: Array[Int], +@transient var returnType: TypeInformation[Row]) + extends KeySelector[CRow, Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: CRow): Row = { +val row = value.row +val fields = keyFields --- End diff -- Why not use `keyFields` directly? ---
[GitHub] flink pull request #4732: [FLINK-7426] [table] Support null values in keys
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4732#discussion_r142167389 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Null-aware key selector. + */ +class CRowKeySelector( +val keyFields: Array[Int], +@transient var returnType: TypeInformation[Row]) + extends KeySelector[CRow, Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: CRow): Row = { +val row = value.row +val fields = keyFields + +val newKey = new Row(fields.length) +var i = 0 +while (i < fields.length) { + newKey.setField(i, row.getField(fields(i))) --- End diff -- if we do that, we can also make `Row.copy()` more efficient by adding a private constructor that receives an `Object[]` and implementing `copy()` as `return new Row(Arrays.copyOf(row.fields, row.fields.length));` ---
[jira] [Commented] (FLINK-7749) remove the ResultPartitionWriter
[ https://issues.apache.org/jira/browse/FLINK-7749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188308#comment-16188308 ] ASF GitHub Bot commented on FLINK-7749: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4762 [FLINK-7749][network] remove the ResultPartitionWriter wrapper ## What is the purpose of the change After changing task event notification to the `TaskEventDispatcher` (#4759) and buffer writing to `ResultPartition` (#4761) this PR finally removes the `ResultPartitionWriter` wrapper. ## Brief change log - remove `ResultPartitionWriter` and adapt all classes to use `ResultPartition` instead ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7749 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4762.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4762 commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed Author: Nico KruberDate: 2017-08-29T15:32:52Z [FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels implementation up into ResultPartition commit cfea094aa214612eaafef14554bc68626a6ff948 Author: Nico Kruber Date: 2017-08-29T16:24:00Z [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events Previously, the ResultPartitionWriter implemented the EventListener interface and was used for event registration, although event publishing was already handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for both, event registration and publishing. It also adds the TaskEventDispatcher to the Environment information for a task to be able to work with it (only IterationHeadTask so far). commit 88a0d0efc3e4daabbb6ac50ca0d5fa0481a333b6 Author: Nico Kruber Date: 2017-08-29T16:53:48Z [FLINK-7749][network] remove the ResultPartitionWriter wrapper Previous tasks, i.e. task event notification and buffer writing, are now handled completely by the TaskEventDispatcher and the ResultPartition, respectively. > remove the ResultPartitionWriter > > > Key: FLINK-7749 > URL: https://issues.apache.org/jira/browse/FLINK-7749 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > After changing task event notification to the {{TaskEventDispatcher}} > (FLINK-7746) and buffer writing to {{ResultPartition}} (FLINK-7748), we can > finally remove the {{ResultPartitionWriter}} wrapper. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4762: [FLINK-7749][network] remove the ResultPartitionWr...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4762 [FLINK-7749][network] remove the ResultPartitionWriter wrapper ## What is the purpose of the change After changing task event notification to the `TaskEventDispatcher` (#4759) and buffer writing to `ResultPartition` (#4761) this PR finally removes the `ResultPartitionWriter` wrapper. ## Brief change log - remove `ResultPartitionWriter` and adapt all classes to use `ResultPartition` instead ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7749 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4762.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4762 commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed Author: Nico KruberDate: 2017-08-29T15:32:52Z [FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels implementation up into ResultPartition commit cfea094aa214612eaafef14554bc68626a6ff948 Author: Nico Kruber Date: 2017-08-29T16:24:00Z [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events Previously, the ResultPartitionWriter implemented the EventListener interface and was used for event registration, although event publishing was already handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for both, event registration and publishing. It also adds the TaskEventDispatcher to the Environment information for a task to be able to work with it (only IterationHeadTask so far). commit 88a0d0efc3e4daabbb6ac50ca0d5fa0481a333b6 Author: Nico Kruber Date: 2017-08-29T16:53:48Z [FLINK-7749][network] remove the ResultPartitionWriter wrapper Previous tasks, i.e. task event notification and buffer writing, are now handled completely by the TaskEventDispatcher and the ResultPartition, respectively. ---
[jira] [Commented] (FLINK-7748) remove event listener behaviour from ResultPartitionWriter
[ https://issues.apache.org/jira/browse/FLINK-7748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188297#comment-16188297 ] ASF GitHub Bot commented on FLINK-7748: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4761 [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events ## What is the purpose of the change `ResultPartitionWriter` currently implements the `EventListener` interface and is used for event registration, although event publishing is already handled via the `TaskEventDispatcher`. Instead of using two different places, this should be unified by using `TaskEventDispatcher` only which is done by this PR. Please note that this PR builds upon #4759. ## Brief change log - make `TaskEventDispatcher` more generic to register result partitions via `ResultPartitionID` with one `TaskEventHandler` per partition (handled by `TaskEventDispatcher`, not inside `ResultPartitionWriter`) - remove the `EventListener` implementation from `ResultPartitionWriter` - add the `TaskEventDispatcher` to the `Environment` information for a task to be able to work with it (only `IterationHeadTask` is using this as of now) - adapt all places to use `TaskEventDispatcher` instead of `ResultPartitionWriter` ## Verifying this change This change added tests and can be verified as follows: - a new `TaskEventDispatcherTest` verifies that `TaskEventDispatcher` works as expected - indirectly, tests under `org.apache.flink.runtime.iterative` may verify the use of the `IterationHeadTask` which is the only user so far ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4761.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4761 commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed Author: Nico KruberDate: 2017-08-29T15:32:52Z [FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels implementation up into ResultPartition commit cfea094aa214612eaafef14554bc68626a6ff948 Author: Nico Kruber Date: 2017-08-29T16:24:00Z [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events Previously, the ResultPartitionWriter implemented the EventListener interface and was used for event registration, although event publishing was already handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for both, event registration and publishing. It also adds the TaskEventDispatcher to the Environment information for a task to be able to work with it (only IterationHeadTask so far). > remove event listener behaviour from ResultPartitionWriter > -- > > Key: FLINK-7748 > URL: https://issues.apache.org/jira/browse/FLINK-7748 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{ResultPartitionWriter}} currently implements the {{EventListener}} > interface and is used for event registration, although event publishing is > already handled via the {{TaskEventDispatcher}}. This should be unified by > using {{TaskEventDispatcher}} only. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4761: [FLINK-7748][network] properly use the TaskEventDi...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4761 [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events ## What is the purpose of the change `ResultPartitionWriter` currently implements the `EventListener` interface and is used for event registration, although event publishing is already handled via the `TaskEventDispatcher`. Instead of using two different places, this should be unified by using `TaskEventDispatcher` only which is done by this PR. Please note that this PR builds upon #4759. ## Brief change log - make `TaskEventDispatcher` more generic to register result partitions via `ResultPartitionID` with one `TaskEventHandler` per partition (handled by `TaskEventDispatcher`, not inside `ResultPartitionWriter`) - remove the `EventListener` implementation from `ResultPartitionWriter` - add the `TaskEventDispatcher` to the `Environment` information for a task to be able to work with it (only `IterationHeadTask` is using this as of now) - adapt all places to use `TaskEventDispatcher` instead of `ResultPartitionWriter` ## Verifying this change This change added tests and can be verified as follows: - a new `TaskEventDispatcherTest` verifies that `TaskEventDispatcher` works as expected - indirectly, tests under `org.apache.flink.runtime.iterative` may verify the use of the `IterationHeadTask` which is the only user so far ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (JavaDocs) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4761.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4761 commit d0cf483e0fb21010ed996935ffb14d62b34ee8ed Author: Nico KruberDate: 2017-08-29T15:32:52Z [FLINK-7746][network] move ResultPartitionWriter#writeBufferToAllChannels implementation up into ResultPartition commit cfea094aa214612eaafef14554bc68626a6ff948 Author: Nico Kruber Date: 2017-08-29T16:24:00Z [FLINK-7748][network] properly use the TaskEventDispatcher for subscribing to events Previously, the ResultPartitionWriter implemented the EventListener interface and was used for event registration, although event publishing was already handled via the TaskEventDispatcher. Now, we use the TaskEventDispatcher for both, event registration and publishing. It also adds the TaskEventDispatcher to the Environment information for a task to be able to work with it (only IterationHeadTask so far). ---
[jira] [Commented] (FLINK-7051) Bump up Calcite version to 1.14
[ https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188254#comment-16188254 ] Fabian Hueske commented on FLINK-7051: -- The vote for Calcite 1.14 has passed and it will be released very soon. > Bump up Calcite version to 1.14 > --- > > Key: FLINK-7051 > URL: https://issues.apache.org/jira/browse/FLINK-7051 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Haohui Mai > > This is an umbrella issue for all tasks that need to be done once Apache > Calcite 1.14 is released. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle
[ https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7728. --- Resolution: Fixed Fix Version/s: (was: 1.2.2) Fixed on release-1.3 in 4dcd9fba21eef157063d2d70b0e55869009f2954 2875260d067c1e6754a248b8e281516ad9b5a269 c528139c3c2b33ba7bd11df154ba204408713b02 > StatusWatermarkValve has different min watermark advancement behavior > depending on the ordering inputs become idle > -- > > Key: FLINK-7728 > URL: https://issues.apache.org/jira/browse/FLINK-7728 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, once all inputs of a {{StatusWatermarkValve}} becomes idle, we > only emit the {{StreamStatus.IDLE}} marker, and check nothing else. This > makes the watermark advancement behavior inconsistent in the case that all > inputs become idle, depending on the order that they become idle. > Consider the following setup: > {code} > Channel #1: Watermark 10, ACTIVE > Channel #2: Watermark 5, ACTIVE > Channel #3: Watermark 5, ACTIVE > {code} > If the channels become IDLE with the order #2 -> #3 -> #1, watermark 10 will > be emitted as the new min watermark once both #2 and #3 becomes idle. > On the other hand, if the order is #1 -> #2 -> #3, watermark 10 will not be > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4747: [FLINK-7728] [DataStream] Flush StatusWatermarkValve once...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4747 Merged. 👌 Could you please also close this PR? ---
[jira] [Commented] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle
[ https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188232#comment-16188232 ] ASF GitHub Bot commented on FLINK-7728: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4747 Merged.  Could you please also close this PR? > StatusWatermarkValve has different min watermark advancement behavior > depending on the ordering inputs become idle > -- > > Key: FLINK-7728 > URL: https://issues.apache.org/jira/browse/FLINK-7728 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, once all inputs of a {{StatusWatermarkValve}} becomes idle, we > only emit the {{StreamStatus.IDLE}} marker, and check nothing else. This > makes the watermark advancement behavior inconsistent in the case that all > inputs become idle, depending on the order that they become idle. > Consider the following setup: > {code} > Channel #1: Watermark 10, ACTIVE > Channel #2: Watermark 5, ACTIVE > Channel #3: Watermark 5, ACTIVE > {code} > If the channels become IDLE with the order #2 -> #3 -> #1, watermark 10 will > be emitted as the new min watermark once both #2 and #3 becomes idle. > On the other hand, if the order is #1 -> #2 -> #3, watermark 10 will not be > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188220#comment-16188220 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142159219 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,417 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +*
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142158422 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,417 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. --- End diff -- "or would result in an inefficient type" -> "or cases where automatic type inference results in an inefficient type." ---
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188221#comment-16188221 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142158422 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,417 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. --- End diff -- "or would result in an inefficient type" -> "or cases where automatic type inference results in an inefficient type." > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142159219 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,417 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where type information has to be + * supplied manually or would result in an inefficient type. + * + * Please note that the Scala API and Table API provide more specialized Types classes. + * (See org.apache.flink.api.scala.Types and org.apache.flink.table.api.Types) + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and {@link java.lang.Byte}. +* Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and {@link java.lang.Boolean}. +* Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and {@link java.lang.Short}. +* Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; + + /** +* Returns type information for both a primitive int and {@link java.lang.Integer}. +* Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and {@link java.lang.Long}. +* Does not support a null value. +*/ + public static final TypeInformation LONG = BasicTypeInfo.LONG_TYPE_INFO; + + /** +* Returns type information for both a primitive float and {@link java.lang.Float}.
[jira] [Commented] (FLINK-7728) StatusWatermarkValve has different min watermark advancement behavior depending on the ordering inputs become idle
[ https://issues.apache.org/jira/browse/FLINK-7728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188218#comment-16188218 ] Aljoscha Krettek commented on FLINK-7728: - Fixed on master in 6481564cb581164a761444922ddfb4ad11a8ef55 f9b79662ef575580a841af36747618a29a3955d9 ea86a537e6db1606241c0b8afe03c0754176c85e > StatusWatermarkValve has different min watermark advancement behavior > depending on the ordering inputs become idle > -- > > Key: FLINK-7728 > URL: https://issues.apache.org/jira/browse/FLINK-7728 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.2.2, 1.4.0, 1.3.3 > > > Currently, once all inputs of a {{StatusWatermarkValve}} becomes idle, we > only emit the {{StreamStatus.IDLE}} marker, and check nothing else. This > makes the watermark advancement behavior inconsistent in the case that all > inputs become idle, depending on the order that they become idle. > Consider the following setup: > {code} > Channel #1: Watermark 10, ACTIVE > Channel #2: Watermark 5, ACTIVE > Channel #3: Watermark 5, ACTIVE > {code} > If the channels become IDLE with the order #2 -> #3 -> #1, watermark 10 will > be emitted as the new min watermark once both #2 and #3 becomes idle. > On the other hand, if the order is #1 -> #2 -> #3, watermark 10 will not be > emitted. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels
[ https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7721. --- Resolution: Fixed Fixed on master in c81a7e519bf3db2f9c5e62fadd7a3a4fb7692904 > StatusWatermarkValve should output a new min watermark only if it was > aggregated from aligned chhanels > -- > > Key: FLINK-7721 > URL: https://issues.apache.org/jira/browse/FLINK-7721 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Context: > {code} > long newMinWatermark = Long.MAX_VALUE; > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > {code} > In the calculation of the new min watermark in > {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, > there is not verification that the calculated new min watermark > {{newMinWatermark}} really is aggregated from some aligned channel. > In the corner case where all input channels are currently not aligned but > actually some are active, we would then incorrectly determine that the final > aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that. > The fix would simply be to only emit the aggregated watermark IFF it was > really calculated from some aligned input channel (as well as the already > existing constraint that it needs to be larger than the last emitted > watermark). This change should also safely cover the case that a > {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-7721) StatusWatermarkValve should output a new min watermark only if it was aggregated from aligned chhanels
[ https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-7721: - Reopen to fix git commit hash > StatusWatermarkValve should output a new min watermark only if it was > aggregated from aligned chhanels > -- > > Key: FLINK-7721 > URL: https://issues.apache.org/jira/browse/FLINK-7721 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1, 1.4.0, 1.3.2 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Context: > {code} > long newMinWatermark = Long.MAX_VALUE; > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > {code} > In the calculation of the new min watermark in > {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, > there is not verification that the calculated new min watermark > {{newMinWatermark}} really is aggregated from some aligned channel. > In the corner case where all input channels are currently not aligned but > actually some are active, we would then incorrectly determine that the final > aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that. > The fix would simply be to only emit the aggregated watermark IFF it was > really calculated from some aligned input channel (as well as the already > existing constraint that it needs to be larger than the last emitted > watermark). This change should also safely cover the case that a > {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142157854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -20,60 +20,123 @@ package org.apache.flink.table.api import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes} import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo -import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { --- End diff -- Makes sense, thanks ---
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188202#comment-16188202 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142157854 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/Types.scala --- @@ -20,60 +20,123 @@ package org.apache.flink.table.api import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation, Types => JTypes} import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo -import org.apache.flink.types.Row import _root_.scala.annotation.varargs /** - * This class enumerates all supported types of the Table API. + * This class enumerates all supported types of the Table API & SQL. */ object Types { --- End diff -- Makes sense, thanks > Add helper methods for all built-in Flink types to Types > > > Key: FLINK-7452 > URL: https://issues.apache.org/jira/browse/FLINK-7452 > Project: Flink > Issue Type: Improvement > Components: Type Serialization System >Reporter: Timo Walther >Assignee: Timo Walther > > Sometimes it is very difficult to provide `TypeInformation` manually, in case > some extraction fails or is not available. {{TypeHint}}s should be the > preferred way but this methods can ensure correct types. > I propose to add all built-in Flink types to the {{Types}}. Such as: > {code} > Types.POJO(MyPojo.class) > Types.POJO(Map) > Types.GENERIC(Object.class) > Types.TUPLE(TypeInformation, ...) > Types.MAP(TypeInformation, TypeInformation) > {code} > The methods should validate that the returned type is exactly the requested > type. And especially in case of POJO should help creating {{PojoTypeInfo}}. > Once this is in place, we can deprecate the {{TypeInfoParser}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7452) Add helper methods for all built-in Flink types to Types
[ https://issues.apache.org/jira/browse/FLINK-7452?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188200#comment-16188200 ] ASF GitHub Bot commented on FLINK-7452: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142157686 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,408 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where the extraction is not possible + * (or inefficient) as well as cases where type information has to be supplied manually. + * + * Depending on the API you are using (e.g. Scala API or Table API), there might be a more + * specialized Types class. + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and a +* wrapped {@link java.lang.Byte}. Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and a +* wrapped {@link java.lang.Boolean}. Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and a +* wrapped {@link java.lang.Short}. Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final SqlTimeTypeInfo SQL_DATE = SqlTimeTypeInfo.DATE; - public static final SqlTimeTypeInfo SQL_TIME = SqlTimeTypeInfo.TIME; - public static final SqlTimeTypeInfo SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP; + /** +* Returns type information for both a primitive int and a +* wrapped
[GitHub] flink pull request #4612: [FLINK-7452] [types] Add helper methods for all bu...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4612#discussion_r142157686 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java --- @@ -19,56 +19,408 @@ package org.apache.flink.api.common.typeinfo; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.functions.InvalidTypesException; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.EnumTypeInfo; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.PojoField; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Time; -import java.sql.Timestamp; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** - * This class gives access to the type information of the most most common types. + * This class gives access to the type information of the most common types for which Flink + * has built-in serializers and comparators. + * + * In many cases, Flink tries to analyze generic signatures of functions to determine return + * types automatically. This class is intended for cases where the extraction is not possible + * (or inefficient) as well as cases where type information has to be supplied manually. + * + * Depending on the API you are using (e.g. Scala API or Table API), there might be a more + * specialized Types class. + * + * A more convenient alternative might be a {@link TypeHint}. + * + * @see TypeInformation#of(Class) specify type information based on a class that will be analyzed + * @see TypeInformation#of(TypeHint) specify type information based on a {@link TypeHint} */ @PublicEvolving public class Types { - public static final BasicTypeInfo STRING = BasicTypeInfo.STRING_TYPE_INFO; - public static final BasicTypeInfo BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; - public static final BasicTypeInfo BYTE = BasicTypeInfo.BYTE_TYPE_INFO; - public static final BasicTypeInfo SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final BasicTypeInfo INT = BasicTypeInfo.INT_TYPE_INFO; - public static final BasicTypeInfo LONG = BasicTypeInfo.LONG_TYPE_INFO; - public static final BasicTypeInfo FLOAT = BasicTypeInfo.FLOAT_TYPE_INFO; - public static final BasicTypeInfo DOUBLE = BasicTypeInfo.DOUBLE_TYPE_INFO; - public static final BasicTypeInfo DECIMAL = BasicTypeInfo.BIG_DEC_TYPE_INFO; + /** +* Returns type information for {@link java.lang.Void}. Does not support a null value. +*/ + public static final TypeInformation VOID = BasicTypeInfo.VOID_TYPE_INFO; + + /** +* Returns type information for {@link java.lang.String}. Supports a null value. +*/ + public static final TypeInformation STRING = BasicTypeInfo.STRING_TYPE_INFO; + + /** +* Returns type information for both a primitive byte and a +* wrapped {@link java.lang.Byte}. Does not support a null value. +*/ + public static final TypeInformation BYTE = BasicTypeInfo.BYTE_TYPE_INFO; + + /** +* Returns type information for both a primitive boolean and a +* wrapped {@link java.lang.Boolean}. Does not support a null value. +*/ + public static final TypeInformation BOOLEAN = BasicTypeInfo.BOOLEAN_TYPE_INFO; + + /** +* Returns type information for both a primitive short and a +* wrapped {@link java.lang.Short}. Does not support a null value. +*/ + public static final TypeInformation SHORT = BasicTypeInfo.SHORT_TYPE_INFO; - public static final SqlTimeTypeInfo SQL_DATE = SqlTimeTypeInfo.DATE; - public static final SqlTimeTypeInfo SQL_TIME = SqlTimeTypeInfo.TIME; - public static final SqlTimeTypeInfo SQL_TIMESTAMP = SqlTimeTypeInfo.TIMESTAMP; + /** +* Returns type information for both a primitive int and a +* wrapped {@link java.lang.Integer}. Does not support a null value. +*/ + public static final TypeInformation INT = BasicTypeInfo.INT_TYPE_INFO; + + /** +* Returns type information for both a primitive long and a +
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188191#comment-16188191 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142157044 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link JobSubmitHandler}. + */ +public class JobSubmitHandlerTest { + + @Test + public void testSerializationFailureHandling() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234;), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]); + + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway); + + try { + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); + Assert.fail(); + } catch (ExecutionException ee) { + RestHandlerException rhe = (RestHandlerException) ee.getCause(); + + Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rhe.getHttpResponseStatus()); + } + } + + @Test + public void testSuccessfulJobSubmission() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234;), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + JobGraph job = new JobGraph("testjob"); + JobSubmitRequestBody request = new JobSubmitRequestBody(job); + + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); --- End diff -- we call get() on the returned
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142157044 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link JobSubmitHandler}. + */ +public class JobSubmitHandlerTest { + + @Test + public void testSerializationFailureHandling() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234;), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]); + + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway); + + try { + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); + Assert.fail(); + } catch (ExecutionException ee) { + RestHandlerException rhe = (RestHandlerException) ee.getCause(); + + Assert.assertEquals(HttpResponseStatus.BAD_REQUEST, rhe.getHttpResponseStatus()); + } + } + + @Test + public void testSuccessfulJobSubmission() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.submitJob(any(JobGraph.class), any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + JobSubmitHandler handler = new JobSubmitHandler( + CompletableFuture.completedFuture("http://localhost:1234;), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + JobGraph job = new JobGraph("testjob"); + JobSubmitRequestBody request = new JobSubmitRequestBody(job); + + handler.handleRequest(new HandlerRequest<>(request, EmptyMessageParameters.getInstance()), mockGateway).get(); --- End diff -- we call get() on the returned future, which will fail fit he job submission was not successful. But I will move the get on the next line since it can be easily missed. ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188189#comment-16188189 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142156635 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.messages.RequestBody; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Arrays; + +/** + * Request for submitting a job. + * + * We currently require the job-jars to be uploaded through the blob-server. + */ +public final class JobSubmitRequestBody implements RequestBody { + + private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + + /** +* The serialized job graph. +*/ + @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) + public final byte[] serializedJobGraph; --- End diff -- the current max is 10mb. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188190#comment-16188190 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142156728 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link BlobServerPortHandler}. + */ +public class BlobServerPortHandlerTest { + private static final int PORT = 64; + + @Test + public void testPortRetrieval() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT)); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + BlobServerPortHandler handler = new BlobServerPortHandler( + CompletableFuture.completedFuture("http://localhost:1234;), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + BlobServerPortResponseBody portResponse = handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway).get(); + + Assert.assertEquals(PORT, portResponse.port); + } + + @Test + public void testPortRetrievalFailureHandling() throws Exception { + + } --- End diff -- jup > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188188#comment-16188188 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142156568 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java --- @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); --- End diff -- so that you can actually access (and resolve) them in `RestClusterClient#stop()`. > Create RESTful cluster endpoint > --- > > Key: FLINK-7072 > URL: https://issues.apache.org/jira/browse/FLINK-7072 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Chesnay Schepler > Labels: flip-6 > Fix For: 1.4.0 > > > In order to communicate with the cluster from the RESTful client, we have to > implement a RESTful cluster endpoint. The endpoint shall support the > following operations: > * List jobs (GET): Get list of all running jobs on the cluster > * Submit job (POST): Submit a job to the cluster (only supported in session > mode) > * Get job status (GET): Get the status of an executed job (and maybe the > JobExecutionResult) > * Lookup job leader (GET): Gets the JM leader for the given job > This endpoint will run in session mode alongside the dispatcher/session > runner and forward calls to this component which maintains a view on all > currently executed jobs. > In the per-job mode, the endpoint will return only the single running job and > the address of the JobManager alongside which it is running. Furthermore, it > won't accept job submissions. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142156728 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link BlobServerPortHandler}. + */ +public class BlobServerPortHandlerTest { + private static final int PORT = 64; + + @Test + public void testPortRetrieval() throws Exception { + DispatcherGateway mockGateway = mock(DispatcherGateway.class); + when(mockGateway.getBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(PORT)); + GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + BlobServerPortHandler handler = new BlobServerPortHandler( + CompletableFuture.completedFuture("http://localhost:1234;), + mockGatewayRetriever, + RpcUtils.INF_TIMEOUT); + + BlobServerPortResponseBody portResponse = handler.handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway).get(); + + Assert.assertEquals(PORT, portResponse.port); + } + + @Test + public void testPortRetrievalFailureHandling() throws Exception { + + } --- End diff -- jup ---
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142156635 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitRequestBody.java --- @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.messages.RequestBody; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.Arrays; + +/** + * Request for submitting a job. + * + * We currently require the job-jars to be uploaded through the blob-server. + */ +public final class JobSubmitRequestBody implements RequestBody { + + private static final String FIELD_NAME_SERIALIZED_JOB_GRAPH = "serializedJobGraph"; + + /** +* The serialized job graph. +*/ + @JsonProperty(FIELD_NAME_SERIALIZED_JOB_GRAPH) + public final byte[] serializedJobGraph; --- End diff -- the current max is 10mb. ---
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142156568 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationMessageParameters.java --- @@ -28,8 +28,8 @@ */ public class JobTerminationMessageParameters extends MessageParameters { - private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); + public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + public final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); --- End diff -- so that you can actually access (and resolve) them in `RestClusterClient#stop()`. ---
[jira] [Commented] (FLINK-7072) Create RESTful cluster endpoint
[ https://issues.apache.org/jira/browse/FLINK-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188182#comment-16188182 ] ASF GitHub Bot commented on FLINK-7072: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142155888 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); + TestJobTerminationHandler
[GitHub] flink pull request #4742: [FLINK-7072] [REST] Add Flip-6 client for submit/j...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4742#discussion_r142155888 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.rest.RestServerEndpoint; +import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders; +import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.JobTerminationHeaders; +import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters; +import org.apache.flink.runtime.rest.messages.TerminationModeQueryParameter; +import org.apache.flink.runtime.rest.messages.job.JobSubmitHeaders; +import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody; +import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link RestClusterClient}. + */ +public class RestClusterClientTest { + + private static final String restAddress = "http://localhost:1234;; + private static final Dispatcher mockRestfulGateway = mock(Dispatcher.class); + private static final GatewayRetriever mockGatewayRetriever = mock(GatewayRetriever.class); + + static { + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + when(mockGatewayRetriever.getNow()).thenReturn(Optional.of(mockRestfulGateway)); + } + + @Test + public void testABC() throws Exception { + + Configuration config = new Configuration(); + config.setString(JobManagerOptions.ADDRESS, "localhost"); + + RestServerEndpointConfiguration rsec = RestServerEndpointConfiguration.fromConfiguration(config); + + TestBlobServerPortHandler portHandler = new TestBlobServerPortHandler(); + TestJobSubmitHandler submitHandler = new TestJobSubmitHandler(); + TestJobTerminationHandler terminationHandler = new TestJobTerminationHandler(); + + RestServerEndpoint rse = new RestServerEndpoint(rsec) { + @Override + protected
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188181#comment-16188181 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4509 @NicoK , I have fixed the other code issues. For UT, I only added one test for verifying there is no race condition between requesting and recycling floating buffers. I am not sure whether to control the process by `CountDownLatch` is enough. Or I should submit two different threads to execute the process repeated. If this way is ok, I will add more tests for race condition along with `NetworkBufferPool#createBufferPool()` modifications you suggested. For fair distribution of buffers test, I will add one test, maybe in this PR or in #4735 later. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4509: [FLINK-7406][network] Implement Netty receiver incoming p...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4509 @NicoK , I have fixed the other code issues. For UT, I only added one test for verifying there is no race condition between requesting and recycling floating buffers. I am not sure whether to control the process by `CountDownLatch` is enough. Or I should submit two different threads to execute the process repeated. If this way is ok, I will add more tests for race condition along with `NetworkBufferPool#createBufferPool()` modifications you suggested. For fair distribution of buffers test, I will add one test, maybe in this PR or in #4735 later. ---
[jira] [Assigned] (FLINK-7709) Port CheckpointStatsDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7709: Assignee: Till Rohrmann > Port CheckpointStatsDetailsHandler to new REST endpoint > --- > > Key: FLINK-7709 > URL: https://issues.apache.org/jira/browse/FLINK-7709 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{CheckpointStatsDetailsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7406) Implement Netty receiver incoming pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188047#comment-16188047 ] ASF GitHub Bot commented on FLINK-7406: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r142142736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -235,14 +240,15 @@ void releaseBuffer() { ByteBuf write(ByteBufAllocator allocator) throws IOException { checkNotNull(buffer, "No buffer instance to serialize."); - int length = 16 + 4 + 1 + 4 + buffer.getSize(); + int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize(); --- End diff -- I think the lengths for the corresponding fields can be seen clearly in the below write method. > Implement Netty receiver incoming pipeline for credit-based > --- > > Key: FLINK-7406 > URL: https://issues.apache.org/jira/browse/FLINK-7406 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > Currently {{PartitionRequestClientHandler}} receives and reads > {{BufferResponse}} from producer. It will request buffer from {{BufferPool}} > for holding the message. If not got, the message is staged temporarily and > {{autoread}} for channel is set false. > For credit-based mode, {{PartitionRequestClientHandler}} can always get > buffer from {{RemoteInputChannel}} for reading messages from producer. > The related works are: > * Add the backlog of producer in {{BufferResponse}} message structure > * {{PartitionRequestClientHandler}} requests buffer from > {{RemoteInputChannel}} directly > * {{PartitionRequestClientHandler}} updates backlog for > {{RemoteInputChannel}}, and it may trigger requests of floating buffers from > {{BufferPool}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4509: [FLINK-7406][network] Implement Netty receiver inc...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4509#discussion_r142142736 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java --- @@ -235,14 +240,15 @@ void releaseBuffer() { ByteBuf write(ByteBufAllocator allocator) throws IOException { checkNotNull(buffer, "No buffer instance to serialize."); - int length = 16 + 4 + 1 + 4 + buffer.getSize(); + int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize(); --- End diff -- I think the lengths for the corresponding fields can be seen clearly in the below write method. ---
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188030#comment-16188030 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142136764 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java --- @@ -407,4 +419,72 @@ static void closeSilently(Socket socket, Logger LOG) { private BlobUtils() { throw new RuntimeException(); } + + /** +* Moves the temporary incomingFile to its permanent location where it is available for +* use. +* +* @param incomingFile +* temporary file created during transfer +* @param jobId +* ID of the job this blob belongs to or null if job-unrelated +* @param blobKey +* BLOB key identifying the file +* @param storageFile +* (local) file where the blob is/should be stored +* @param writeLock +* lock to acquire before doing the move +* @param log +* logger for debug information +* @param blobStore +* HA store (or null if unavailable) +* +* @throws IOException +* thrown if an I/O error occurs while moving the file or uploading it to the HA store +*/ + static void moveTempFileToStore( + File incomingFile, @Nullable JobID jobId, BlobKey blobKey, File storageFile, + Lock writeLock, Logger log, @Nullable BlobStore blobStore) throws IOException { + + writeLock.lock(); + + try { + // first check whether the file already exists + if (!storageFile.exists()) { + try { + // only move the file if it does not yet exist + Files.move(incomingFile.toPath(), storageFile.toPath()); + + incomingFile = null; + + } catch (FileAlreadyExistsException ignored) { + log.warn("Detected concurrent file modifications. This should only happen if multiple" + + "BlobServer use the same storage directory."); + // we cannot be sure at this point whether the file has already been uploaded to the blob + // store or not. Even if the blobStore might shortly be in an inconsistent state, we have --- End diff -- "we have to" > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188031#comment-16188031 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142128358 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -18,89 +18,21 @@ package org.apache.flink.runtime.blob; -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import javax.annotation.Nullable; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The BLOB cache implements a local cache for content-addressable BLOBs. - * - * When requesting BLOBs through the {@link BlobCache#getFile} methods, the - * BLOB cache will first attempt to serve the file from its local cache. Only if - * the local cache does not contain the desired BLOB, the BLOB cache will try to - * download it from a distributed file system (if available) or the BLOB - * server. + * The BLOB cache provides access to BLOB services for permanent and transient BLOBs. */ -public class BlobCache extends TimerTask implements BlobService { - - /** The log object used for debugging. */ - private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class); - - private final InetSocketAddress serverAddress; - - /** Root directory for local file storage */ - private final File storageDir; - - /** Blob store for distributed file storage, e.g. in HA */ - private final BlobView blobView; - - private final AtomicBoolean shutdownRequested = new AtomicBoolean(); - - /** Shutdown hook thread to ensure deletion of the storage directory. */ - private final Thread shutdownHook; - - /** The number of retries when the transfer fails */ - private final int numFetchRetries; - - /** Configuration for the blob client like ssl parameters required to connect to the blob server */ - private final Configuration blobClientConfig; - - // - - /** -* Job reference counters with a time-to-live (TTL). -*/ - private static class RefCount { - /** -* Number of references to a job. -*/ - public int references = 0; - - /** -* Timestamp in milliseconds when any job data should be cleaned up (no cleanup for -* non-positive values). -*/ - public long keepUntil = -1; - } - - /** Map to store the number of references to a specific job */ - private final MapjobRefCounters = new HashMap<>(); +public class BlobCache implements BlobService { --- End diff -- Could this be called `BlobServiceImpl`? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188032#comment-16188032 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142138008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/PermanentBlobCache.java --- @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; + +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Provides a cache for permanent BLOB files including a per-job ref-counting and a staged cleanup. + * + * When requesting BLOBs via {@link #getPermanentFile(JobID, BlobKey)}, the cache will first attempt to + * serve the file from its local cache. Only if the local cache does not contain the desired BLOB, + * it will try to download it from a distributed HA file system (if available) or the BLOB server. + * + * If files for a job are not needed any more, they will enter a staged, i.e. deferred, cleanup. + * Files may thus still be be accessible upon recovery and do not need to be re-downloaded. + */ +public class PermanentBlobCache extends AbstractBlobCache implements PermanentBlobService { + + /** +* Job reference counters with a time-to-live (TTL). +*/ + @VisibleForTesting + static class RefCount { + /** +* Number of references to a job. +*/ + public int references = 0; + + /** +* Timestamp in milliseconds when any job data should be cleaned up (no cleanup for +* non-positive values). +*/ + public long keepUntil = -1; + } + + /** +* Map to store the number of references to a specific job. +*/ + private final MapjobRefCounters = new HashMap<>(); + + /** +* Time interval (ms) to run the cleanup task; also used as the default TTL. +*/ + private final long cleanupInterval; + + private final Timer cleanupTimer; + + /** +* Instantiates a new cache for permanent BLOBs which are also available in an HA store. +* +* @param serverAddress +* address of the {@link BlobServer} to use for fetching files from +* @param blobClientConfig +* global configuration +* @param blobView +* (distributed) HA blob store file system to retrieve files from first +* +* @throws IOException +* thrown if the (local or distributed) file storage cannot be created or is not usable +*/ + public PermanentBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig, + final BlobView blobView) throws IOException { + + super(serverAddress, blobClientConfig, blobView, + LoggerFactory.getLogger(PermanentBlobCache.class)); + + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + this.cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; +
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188039#comment-16188039 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142126654 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.FileUtils; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Abstract base class for permanent and transient BLOB files. + */ +public abstract class AbstractBlobCache implements Closeable { + + /** +* The log object used for debugging. +*/ + protected final Logger LOG; + + /** +* Counter to generate unique names for temporary files. +*/ + protected final AtomicLong tempFileCounter = new AtomicLong(0); + + protected final InetSocketAddress serverAddress; + + /** +* Root directory for local file storage. +*/ + protected final File storageDir; + + /** +* Blob store for distributed file storage, e.g. in HA. +*/ + protected final BlobView blobView; + + protected final AtomicBoolean shutdownRequested = new AtomicBoolean(); + + /** +* Shutdown hook thread to ensure deletion of the local storage directory. +*/ + protected final Thread shutdownHook; + + /** +* The number of retries when the transfer fails. +*/ + protected final int numFetchRetries; + + /** +* Configuration for the blob client like ssl parameters required to connect to the blob +* server. +*/ + protected final Configuration blobClientConfig; + + /** +* Lock guarding concurrent file accesses. +*/ + protected final ReadWriteLock readWriteLock; + + public AbstractBlobCache( + final InetSocketAddress serverAddress, + final Configuration blobClientConfig, + final BlobView blobView, + final Logger logger) throws IOException { + + this.LOG = logger; + + this.serverAddress = checkNotNull(serverAddress); + this.blobClientConfig = checkNotNull(blobClientConfig); + this.blobView = checkNotNull(blobView, "blobStore"); --- End diff -- `blobStore` != `blobView` > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188036#comment-16188036 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142138928 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -468,7 +468,7 @@ class JobManager( taskManagerGateway match { case x: ActorTaskManagerGateway => handleTaskManagerTerminated(x.getActorGateway().actor(), instance.getId) -case _ => log.debug(s"Cannot remove resource ${resourceID}, because there is " + +case _ => log.debug(s"Cannot remove reosurce ${resourceID}, because there is " + --- End diff -- typo resource > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188026#comment-16188026 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142129505 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t } } + /** +* Downloads the given BLOB from the given server and stores its contents to a (local) file. +* +* Transient BLOB files are deleted after a successful copy of the server's data into the +* given localJarFile. +* +* @param jobId +* job ID the BLOB belongs to or null if job-unrelated +* @param blobKey +* BLOB key +* @param localJarFile +* the local file to write to +* @param serverAddress +* address of the server to download from +* @param blobClientConfig +* client configuration for the connection +* @param numFetchRetries +* number of retries before failing +* +* @throws IOException +* if an I/O error occurs during the download +*/ + static void downloadFromBlobServer( + @Nullable JobID jobId, BlobKey blobKey, File localJarFile, + InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries) + throws IOException { + + final byte[] buf = new byte[BUFFER_SIZE]; + LOG.info("Downloading {}/{} from {}", jobId, blobKey, serverAddress); + + // loop over retries + int attempt = 0; + while (true) { + try ( + final BlobClient bc = new BlobClient(serverAddress, blobClientConfig); + final InputStream is = bc.getInternal(jobId, blobKey); + final OutputStream os = new FileOutputStream(localJarFile) + ) { + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; + } + os.write(buf, 0, read); + } + + return; + } + catch (Throwable t) { + String message = "Failed to fetch BLOB " + jobId + "/" + blobKey + " from " + serverAddress + + " and store it under " + localJarFile.getAbsolutePath(); + if (attempt < numFetchRetries) { + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", t); --- End diff -- Shouldn't this also be an error if the other log statement is an error as well? > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188029#comment-16188029 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142125479 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java --- @@ -158,9 +151,9 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, final Routed rou cache = blobPortFuture.thenApplyAsync( (Integer port) -> { try { - return new BlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView); + return new TransientBlobCache(new InetSocketAddress(jobManagerGateway.getHostname(), port), config); } catch (IOException e) { - throw new FlinkFutureException("Could not create BlobCache.", e); + throw new FlinkFutureException("Could not create TransientBlobCache.", e); --- End diff -- When rebasing, then this will be a `java.util.concurrent.CompletionException`. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188024#comment-16188024 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142129292 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java --- @@ -120,6 +125,76 @@ public BlobClient(InetSocketAddress serverAddress, Configuration clientConfig) t } } + /** +* Downloads the given BLOB from the given server and stores its contents to a (local) file. +* +* Transient BLOB files are deleted after a successful copy of the server's data into the +* given localJarFile. +* +* @param jobId +* job ID the BLOB belongs to or null if job-unrelated +* @param blobKey +* BLOB key +* @param localJarFile +* the local file to write to +* @param serverAddress +* address of the server to download from +* @param blobClientConfig +* client configuration for the connection +* @param numFetchRetries +* number of retries before failing +* +* @throws IOException +* if an I/O error occurs during the download +*/ + static void downloadFromBlobServer( + @Nullable JobID jobId, BlobKey blobKey, File localJarFile, + InetSocketAddress serverAddress, Configuration blobClientConfig, int numFetchRetries) --- End diff -- Wrapping the parameters could be one per line. > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7068) change BlobService sub-classes for permanent and transient BLOBs
[ https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16188025#comment-16188025 ] ASF GitHub Bot commented on FLINK-7068: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4358#discussion_r142133777 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -375,7 +378,32 @@ public File getFile(BlobKey key) throws IOException { * Thrown if the file retrieval failed. */ @Override - public File getFile(JobID jobId, BlobKey key) throws IOException { + public File getTransientFile(JobID jobId, BlobKey key) throws IOException { + checkNotNull(jobId); + return getFileInternal(jobId, key); + } + + /** +* Returns the path to a local copy of the file associated with the provided job ID and blob +* key. +* +* We will first attempt to serve the BLOB from the local storage. If the BLOB is not in +* there, we will try to download it from the HA store. +* +* @param jobId +* ID of the job this blob belongs to +* @param key +* blob key associated with the requested file +* +* @return The path to the file. +* +* @throws java.io.FileNotFoundException +* if the BLOB does not exist; +* @throws IOException +* if any other error occurs when retrieving the file +*/ + @Override + public File getPermanentFile(JobID jobId, BlobKey key) throws IOException { --- End diff -- Same here for `BlobType.PERMANENT` > change BlobService sub-classes for permanent and transient BLOBs > > > Key: FLINK-7068 > URL: https://issues.apache.org/jira/browse/FLINK-7068 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > A {{PermanentBlobStore}} should resemble use cases for BLOBs that are > permanently stored for a job's life time (HA and non-HA). > A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc. > which even does not have to be reflected by files. -- This message was sent by Atlassian JIRA (v6.4.14#64029)