[jira] [Closed] (FLINK-7820) deprecate docs of FoldingState and FoldingStateDescriptor
[ https://issues.apache.org/jira/browse/FLINK-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-7820. --- Resolution: Fixed This is done as part of FLINK-5967 > deprecate docs of FoldingState and FoldingStateDescriptor > - > > Key: FLINK-7820 > URL: https://issues.apache.org/jira/browse/FLINK-7820 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Bowen Li >Assignee: Bowen Li >Priority: Minor > Fix For: 1.4.0 > > > {{FoldState}} and {{FoldStateDescriptor}} have been deprecated. We should > remove docs related to the two classes -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6615) tmp directory not cleaned up on shutdown
[ https://issues.apache.org/jira/browse/FLINK-6615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222978#comment-16222978 ] Bowen Li commented on FLINK-6615: - I would recommend closing this ticket as 'Not a problem', if there's no objection > tmp directory not cleaned up on shutdown > > > Key: FLINK-6615 > URL: https://issues.apache.org/jira/browse/FLINK-6615 > Project: Flink > Issue Type: Bug > Components: Local Runtime >Affects Versions: 1.2.0 >Reporter: Andrey >Assignee: Bowen Li > > Steps to reproduce: > 1) Stop task manager gracefully (kill -6 ) > 2) In the logs: > {code} > 2017-05-17 13:35:50,147 INFO org.apache.zookeeper.ClientCnxn > - EventThread shut down [main-EventThread] > 2017-05-17 13:35:50,200 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - IOManager > failed to properly clean up temp file directory: > /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 > [flink-akka.actor.default-dispatcher-2] > java.nio.file.DirectoryNotEmptyException: > /mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47 > at > sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242) > at > sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103) > at java.nio.file.Files.delete(Files.java:1126) > at org.apache.flink.util.FileUtils.deleteDirectory(FileUtils.java:154) > at > org.apache.flink.runtime.io.disk.iomanager.IOManager.shutdown(IOManager.java:109) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.shutdown(IOManagerAsync.java:185) > at > org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:241) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:477) > {code} > Expected: > * on shutdown delete non-empty directory anyway. > Notes: > * after process terminated, I've checked > "/mnt/flink/tmp/flink-io-66f1d0ec-8976-41bf-9575-f80b181b0e47" directory and > didn't find anything there. So it looks like timing issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7935) Metrics with user supplied scope variables
[ https://issues.apache.org/jira/browse/FLINK-7935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222972#comment-16222972 ] Elias Levy commented on FLINK-7935: --- Possibly. It depends on whether you could add multiple metrics or metric groups that differ in their variables, but that could be formatted the same. E.g. the TaskManagerJobMetricGroup creates and tracks distinct TaskMetricGroup for each task in a the portion of a job the task manager is executing. The metrics for each task are tracked separately, but I can format the scope so all of them are reported with the same name ("taskmanager.job.task") but with different variables/DD tags. > Metrics with user supplied scope variables > -- > > Key: FLINK-7935 > URL: https://issues.apache.org/jira/browse/FLINK-7935 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.3.2 >Reporter: Elias Levy > > We use DataDog for metrics. DD and Flink differ somewhat in how they track > metrics. > Flink names and scopes metrics together, at least by default. E.g. by default > the System scope for operator metrics is > {{.taskmanager}}. > The scope variables become part of the metric's full name. > In DD the metric would be named something generic, e.g. > {{taskmanager.job.operator}}, and they would be distinguished by their tag > values, e.g. {{tm_id=foo}}, {{job_name=var}}, {{operator_name=baz}}. > Flink allows you to configure the format string for system scopes, so it is > possible to set the operator scope format to {{taskmanager.job.operator}}. > We do this for all scopes: > {code} > metrics.scope.jm: jobmanager > metrics.scope.jm.job: jobmanager.job > metrics.scope.tm: taskmanager > metrics.scope.tm.job: taskmanager.job > metrics.scope.task: taskmanager.job.task > metrics.scope.operator: taskmanager.job.operator > {code} > This seems to work. The DataDog Flink metric's plugin submits all scope > variables as tags, even if they are not used within the scope format. And it > appears internally this does not lead to metrics conflicting with each other. > We would like to extend this to user defined metrics, but you can define > variables/scopes when adding a metric group or metric with the user API, so > that in DD we have a single metric with a tag with many different values, > rather than hundreds of metrics to just the one value we want to measure > across different event types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222891#comment-16222891 ] Bowen Li commented on FLINK-6951: - The workaround is: rather than using httpclient-4.2.5 and httpcore-4.2.5 described [here|https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html#flink-for-hadoop-27], I used httpclient-4.3.6 and httpcore-4.3.3. I'm not sure why it's working now. May be because we upgraded KPL and KCL, and they don't the old httpcomponent APIs anymore? [~aljoscha] [~tzulitai] > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0, 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7938) support addAll() in ListState
Bowen Li created FLINK-7938: --- Summary: support addAll() in ListState Key: FLINK-7938 URL: https://issues.apache.org/jira/browse/FLINK-7938 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Bowen Li Assignee: Bowen Li Fix For: 1.5.0 support {{addAll()}} in {{ListState}}, so Flink can be more efficient in adding elements to {{ListState}} in batch. This should give us a much better performance especially for {{ListState}} backed by RocksDB -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7475) support update() in ListState
[ https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7475: Summary: support update() in ListState (was: ListState support update) > support update() in ListState > - > > Key: FLINK-7475 > URL: https://issues.apache.org/jira/browse/FLINK-7475 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Reporter: yf >Assignee: Bowen Li > Fix For: 1.5.0 > > > If I want to update the list. > I have to do two steps: > listState.clear() > for (Element e : myList) { > listState.add(e); > } > Why not I update the state by: > listState.update(myList) ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7929) Add unit/integration tests for states backed by RocksDB
[ https://issues.apache.org/jira/browse/FLINK-7929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222868#comment-16222868 ] Bowen Li commented on FLINK-7929: - [~aljoscha] Thanks for pointing that out! I'm closing this ticket > Add unit/integration tests for states backed by RocksDB > --- > > Key: FLINK-7929 > URL: https://issues.apache.org/jira/browse/FLINK-7929 > Project: Flink > Issue Type: Test > Components: State Backends, Checkpointing, Tests >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > > While exploring how to implement FLINK-7475, I didn't find any existing unit > tests (or there are but I didn't find them...) that I can easily run to test > if {{RocksDB(Value/List/Map/...)State}} works. > We should add unit/integration tests for {{RocksDB(Value/List/Map/...)State}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7929) Add unit/integration tests for states backed by RocksDB
[ https://issues.apache.org/jira/browse/FLINK-7929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li closed FLINK-7929. --- Resolution: Invalid > Add unit/integration tests for states backed by RocksDB > --- > > Key: FLINK-7929 > URL: https://issues.apache.org/jira/browse/FLINK-7929 > Project: Flink > Issue Type: Test > Components: State Backends, Checkpointing, Tests >Reporter: Bowen Li >Assignee: Bowen Li > Fix For: 1.4.0 > > > While exploring how to implement FLINK-7475, I didn't find any existing unit > tests (or there are but I didn't find them...) that I can easily run to test > if {{RocksDB(Value/List/Map/...)State}} works. > We should add unit/integration tests for {{RocksDB(Value/List/Map/...)State}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222855#comment-16222855 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147442488 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -205,12 +207,16 @@ private CassandraSink(SingleOutputStreamOperator sink) { * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - public staticCassandraSinkBuilder addSink(DataStream input) { + public static CassandraSinkBuilder addSink(DataStream input) { TypeInformation typeInfo = input.getType(); if (typeInfo instanceof TupleTypeInfo) { - DataStream tupleInput = (DataStream) input; + DataStream tupleInput = (DataStream) input; return (CassandraSinkBuilder) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); } + if (typeInfo instanceof RowTypeInfo) { --- End diff -- Can check here for a concrete class with `Row.class.equals(typeInfo.getTypeClass())` to also support `GenericType` as well. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222852#comment-16222852 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147456818 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -375,6 +381,34 @@ protected void sanityCheck() { } /** +* Builder for a {@link CassandraRowSink}. +*/ + public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { + public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query == null || query.length() == 0) { + throw new IllegalArgumentException("Query must not be null or empty."); + } + } + + @Override + protected CassandraSink createSink() throws Exception { + return new CassandraSink<>(input.addSink(new CassandraRowSink(query, builder)).name("Cassandra Sink")); + + } + + @Override + protected CassandraSink createWriteAheadSink() throws Exception { + throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types."); --- End diff -- I had a look at `CassandraTupleWriteAheadSink`. It would be straightforward to copy and adapt it for `Row`. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147517745 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link AppendStreamTableSink}. --- End diff -- `cassandra` -> `Cassandra` ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147442488 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -205,12 +207,16 @@ private CassandraSink(SingleOutputStreamOperator sink) { * @param input type * @return CassandraSinkBuilder, to further configure the sink */ - public staticCassandraSinkBuilder addSink(DataStream input) { + public static CassandraSinkBuilder addSink(DataStream input) { TypeInformation typeInfo = input.getType(); if (typeInfo instanceof TupleTypeInfo) { - DataStream tupleInput = (DataStream) input; + DataStream tupleInput = (DataStream) input; return (CassandraSinkBuilder) new CassandraTupleSinkBuilder<>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig())); } + if (typeInfo instanceof RowTypeInfo) { --- End diff -- Can check here for a concrete class with `Row.class.equals(typeInfo.getTypeClass())` to also support `GenericType` as well. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222853#comment-16222853 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147507230 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link AppendStreamTableSink}. + */ +public class CassandraTableSink implements AppendStreamTableSink { --- End diff -- rename to `CassandraAppendTableSink` > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222854#comment-16222854 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147517745 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link AppendStreamTableSink}. --- End diff -- `cassandra` -> `Cassandra` > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147507230 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraTableSink.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import java.util.Properties; + +/** + * A cassandra {@link AppendStreamTableSink}. + */ +public class CassandraTableSink implements AppendStreamTableSink { --- End diff -- rename to `CassandraAppendTableSink` ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147456818 --- Diff: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java --- @@ -375,6 +381,34 @@ protected void sanityCheck() { } /** +* Builder for a {@link CassandraRowSink}. +*/ + public static class CassandraRowSinkBuilder extends CassandraSinkBuilder { + public CassandraRowSinkBuilder(DataStream input, TypeInformation typeInfo, TypeSerializer serializer) { + super(input, typeInfo, serializer); + } + + @Override + protected void sanityCheck() { + super.sanityCheck(); + if (query == null || query.length() == 0) { + throw new IllegalArgumentException("Query must not be null or empty."); + } + } + + @Override + protected CassandraSink createSink() throws Exception { + return new CassandraSink<>(input.addSink(new CassandraRowSink(query, builder)).name("Cassandra Sink")); + + } + + @Override + protected CassandraSink createWriteAheadSink() throws Exception { + throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types."); --- End diff -- I had a look at `CassandraTupleWriteAheadSink`. It would be straightforward to copy and adapt it for `Row`. ---
[GitHub] flink pull request #3748: [FLINK-6225] [Cassandra Connector] add CassandraTa...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147517530 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, injectTableName(INSERT_DATA_QUERY), new Properties()); + CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); + + newCassandrTableSink.emitDataStream(source); + + env.execute(); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraTableSinkE2E() throws Exception { --- End diff -- I think `testCassandraTableSink` can be removed in favor of this test. ---
[jira] [Commented] (FLINK-6225) Support Row Stream for CassandraSink
[ https://issues.apache.org/jira/browse/FLINK-6225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222856#comment-16222856 ] ASF GitHub Bot commented on FLINK-6225: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3748#discussion_r147517530 --- Diff: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java --- @@ -392,6 +425,45 @@ public void testCassandraPojoAtLeastOnceSink() throws Exception { } @Test + public void testCassandraTableSink() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStreamSource source = env.fromCollection(rowCollection); + CassandraTableSink cassandraTableSink = new CassandraTableSink(new ClusterBuilder() { + @Override + protected Cluster buildCluster(Cluster.Builder builder) { + return builder.addContactPointsWithPorts(new InetSocketAddress(HOST, PORT)).build(); + } + }, injectTableName(INSERT_DATA_QUERY), new Properties()); + CassandraTableSink newCassandrTableSink = cassandraTableSink.configure(FIELD_NAMES, FIELD_TYPES); + + newCassandrTableSink.emitDataStream(source); + + env.execute(); + ResultSet rs = session.execute(injectTableName(SELECT_DATA_QUERY)); + Assert.assertEquals(20, rs.all().size()); + } + + @Test + public void testCassandraTableSinkE2E() throws Exception { --- End diff -- I think `testCassandraTableSink` can be removed in favor of this test. > Support Row Stream for CassandraSink > > > Key: FLINK-6225 > URL: https://issues.apache.org/jira/browse/FLINK-6225 > Project: Flink > Issue Type: New Feature > Components: Cassandra Connector >Affects Versions: 1.3.0 >Reporter: Jing Fan >Assignee: Haohui Mai > Fix For: 1.4.0 > > > Currently in CassandraSink, specifying query is not supported for row-stream. > The solution should be similar to CassandraTupleSink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-5967) Add RuntimeContext#getAggregatingState() and document AggregatingState
[ https://issues.apache.org/jira/browse/FLINK-5967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222821#comment-16222821 ] ASF GitHub Bot commented on FLINK-5967: --- Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4899 > Add RuntimeContext#getAggregatingState() and document AggregatingState > -- > > Key: FLINK-5967 > URL: https://issues.apache.org/jira/browse/FLINK-5967 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Documentation >Reporter: Aljoscha Krettek >Assignee: Bowen Li >Priority: Critical > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4899: [FLINK-5967][DataStream API][Doc] Add RuntimeConte...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4899 ---
[GitHub] flink pull request #4904: [hotfix] reorder the methods so they conform to th...
Github user bowenli86 closed the pull request at: https://github.com/apache/flink/pull/4904 ---
[jira] [Updated] (FLINK-7873) Introduce HybridStreamHandle to optimize the recovery mechanism and try to read the checkpoint data locally
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-7873: -- Summary: Introduce HybridStreamHandle to optimize the recovery mechanism and try to read the checkpoint data locally (was: Introduce HybridStreamStateHandle for quick recovery from checkpoint.) > Introduce HybridStreamHandle to optimize the recovery mechanism and try to > read the checkpoint data locally > --- > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwidth, and obtain a faster > recover. > Key issues: > 1. Cache the checkpoint data on local disk and manage it's create and delete. > 2. introduce a HybridStreamStateHandler which try to create a local input > stream first, if failed, it then create a remote input stream, it prototype > looks like below: > {code:java} > class HybridStreamHandle { >private StreamStateHandle localHandle; >private StreamStateHandle remoteHandle; >.. >public FSDataInputStream openInputStream() throws IOException { > FSDataInputStream inputStream = localHandle.openInputStream(); > return inputStream != null ? inputStream : > remoteHandle.openInputStream(); > } >. > } > {code} > Solution: > There are two kind solutions I can think of. > solution1: > Backend do the cached job, and the HybridStreamHandle point to both > local and remote data, HybridStreamHandle is managed by CheckpointCoordinator > as well as other StreamHandle, so CheckpointCoordinator will perform dispose > on it. when HybridStreamHandle performs dispose it call localHandle.dispose() > and remoteHandle.dispose(). In this way, we have to record TaskManager's info > (like location) in localHandle and add an entry in TaskManager to handle > localHandle dispose message, we also have to consider the HA situation. > solution2: > Backend do the cached job and manage the cached data itself. It simple > use a TTL-like method to manage handle's dispose, we dispose a handle if it > wasn't be touched for a X time. We will touch the handles when we recover > from checkpoint or when we performs a checkpoint, once we touch a handle we > reset the TTL for it. In this way, all jobs is done by Backend, it > transparent to both JobManager and TaskManager. The only problem is that we > may dispose a handle that maybe useful, but even in this case, we can read > from remote data finally, and users can avoid this by set a proper TTL value > according to checkpoint interval and other things. > Consider trying not to complicate the problem reasons, i prefer to use the > solution2. Can someone give me some advice? I would appreciate it very much~ -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure
[ https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-7844. -- Resolution: Fixed Fix Version/s: 1.4.0 Fixed via 5231c9300c26895118b3277bc833536e92dcc6d1 > Fine Grained Recovery triggers checkpoint timeout failure > - > > Key: FLINK-7844 > URL: https://issues.apache.org/jira/browse/FLINK-7844 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Zhenzhong Xu >Assignee: Till Rohrmann > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > Context: > We are using "individual" failover (fine-grained) recovery strategy for our > embarrassingly parallel router use case. The topic has over 2000 partitions, > and parallelism is set to ~180 that dispatched to over 20 task managers with > around 180 slots. > Observations: > We've noticed after one task manager termination, even though the individual > recovery happens correctly, that the workload was re-dispatched to a new > available task manager instance. However, the checkpoint would take 10 mins > to eventually timeout, causing all other task managers not able to commit > checkpoints. In a worst-case scenario, if job got restarted for other reasons > (i.e. job manager termination), that would cause more messages to be > re-processed/duplicates compared to the job without fine-grained recovery > enabled. > I am suspecting that uber checkpoint was waiting for a previous checkpoint > that initiated by the old task manager and thus taking a long time to time > out. > Two questions: > 1. Is there a configuration that controls this checkpoint timeout? > 2. Is there any reason that when Job Manager realizes that Task Manager is > gone and workload is redispatched, it still need to wait for the checkpoint > initiated by the old task manager? > Checkpoint screenshot in attachments. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure
[ https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222726#comment-16222726 ] ASF GitHub Bot commented on FLINK-7844: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4844 > Fine Grained Recovery triggers checkpoint timeout failure > - > > Key: FLINK-7844 > URL: https://issues.apache.org/jira/browse/FLINK-7844 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > Context: > We are using "individual" failover (fine-grained) recovery strategy for our > embarrassingly parallel router use case. The topic has over 2000 partitions, > and parallelism is set to ~180 that dispatched to over 20 task managers with > around 180 slots. > Observations: > We've noticed after one task manager termination, even though the individual > recovery happens correctly, that the workload was re-dispatched to a new > available task manager instance. However, the checkpoint would take 10 mins > to eventually timeout, causing all other task managers not able to commit > checkpoints. In a worst-case scenario, if job got restarted for other reasons > (i.e. job manager termination), that would cause more messages to be > re-processed/duplicates compared to the job without fine-grained recovery > enabled. > I am suspecting that uber checkpoint was waiting for a previous checkpoint > that initiated by the old task manager and thus taking a long time to time > out. > Two questions: > 1. Is there a configuration that controls this checkpoint timeout? > 2. Is there any reason that when Job Manager realizes that Task Manager is > gone and workload is redispatched, it still need to wait for the checkpoint > initiated by the old task manager? > Checkpoint screenshot in attachments. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7873) Introduce HybridStreamStateHandle for quick recovery from checkpoint.
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-7873: -- Description: Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwidth, and obtain a faster recover. Key issues: 1. Cache the checkpoint data on local disk and manage it's create and delete. 2. introduce a HybridStreamStateHandler which try to create a local input stream first, if failed, it then create a remote input stream, it prototype looks like below: {code:java} class HybridStreamHandle { private StreamStateHandle localHandle; private StreamStateHandle remoteHandle; .. public FSDataInputStream openInputStream() throws IOException { FSDataInputStream inputStream = localHandle.openInputStream(); return inputStream != null ? inputStream : remoteHandle.openInputStream(); } . } {code} Solution: There are two kind solutions I can think of. solution1: Backend do the cached job, and the HybridStreamHandle point to both local and remote data, HybridStreamHandle is managed by CheckpointCoordinator as well as other StreamHandle, so CheckpointCoordinator will perform dispose on it. when HybridStreamHandle performs dispose it call localHandle.dispose() and remoteHandle.dispose(). In this way, we have to record TaskManager's info (like location) in localHandle and add an entry in TaskManager to handle localHandle dispose message, we also have to consider the HA situation. solution2: Backend do the cached job and manage the cached data itself. It simple use a TTL-like method to manage handle's dispose, we dispose a handle if it wasn't be touched for a X time. We will touch the handles when we recover from checkpoint or when we performs a checkpoint, once we touch a handle we reset the TTL for it. In this way, all jobs is done by Backend, it transparent to both JobManager and TaskManager. The only problem is that we may dispose a handle that maybe useful, but even in this case, we can read from remote data finally, and users can avoid this by set a proper TTL value according to checkpoint interval and other things. Consider trying not to complicate the problem reasons, i prefer to use the solution2. Can someone give me some advice? I would appreciate it very much~ was: Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwidth, and obtain a faster recover. Key issues: 1. Cache the checkpoint data on local disk and manage it's create and delete. 2. introduce a HybridStreamStateHandler which try to create a local input stream first, if failed, it then create a remote input stream, it prototype looks like below: {code:java} class HybridStreamHandle { private StreamStateHandle localHandle; private StreamStateHandle remoteHandle; .. public FSDataInputStream openInputStream() throws IOException { FSDataInputStream inputStream = localHandle.openInputStream(); return inputStream != null ? inputStream : remoteHandle.openInputStream(); } . } {code} Solution: There are two kind solutions I can think of. solution1: Backend do the cached job, and the HybridStreamHandle point to both local and remote data, HybridStreamHandle is managed by CheckpointCoordinator as well as other StreamHandle, so CheckpointCoordinator will perform dispose on it. when HybridStreamHandle performs dispose it call localHandle.dispose() and remoteHandle.dispose(). In this way, we have to record TaskManager's info (like location) in localHandle and add an entry in TaskManager to handle localHandle dispose message, we also have to consider the HA situation. solution2: Backend do the cached job and manage the cached data itself. It simple use a TTL-like method to manage handle's dispose, we dispose a handle if it wasn't be touched for a {code}X{code} time. We
[jira] [Assigned] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure
[ https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7844: Assignee: Till Rohrmann (was: Zhenzhong Xu) > Fine Grained Recovery triggers checkpoint timeout failure > - > > Key: FLINK-7844 > URL: https://issues.apache.org/jira/browse/FLINK-7844 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Zhenzhong Xu >Assignee: Till Rohrmann > Fix For: 1.4.0 > > Attachments: screenshot-1.png > > > Context: > We are using "individual" failover (fine-grained) recovery strategy for our > embarrassingly parallel router use case. The topic has over 2000 partitions, > and parallelism is set to ~180 that dispatched to over 20 task managers with > around 180 slots. > Observations: > We've noticed after one task manager termination, even though the individual > recovery happens correctly, that the workload was re-dispatched to a new > available task manager instance. However, the checkpoint would take 10 mins > to eventually timeout, causing all other task managers not able to commit > checkpoints. In a worst-case scenario, if job got restarted for other reasons > (i.e. job manager termination), that would cause more messages to be > re-processed/duplicates compared to the job without fine-grained recovery > enabled. > I am suspecting that uber checkpoint was waiting for a previous checkpoint > that initiated by the old task manager and thus taking a long time to time > out. > Two questions: > 1. Is there a configuration that controls this checkpoint timeout? > 2. Is there any reason that when Job Manager realizes that Task Manager is > gone and workload is redispatched, it still need to wait for the checkpoint > initiated by the old task manager? > Checkpoint screenshot in attachments. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending ch...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4844 ---
[jira] [Updated] (FLINK-7873) Introduce HybridStreamStateHandle for quick recovery from checkpoint.
[ https://issues.apache.org/jira/browse/FLINK-7873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sihua Zhou updated FLINK-7873: -- Description: Current recover strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of bandwidth when the state is so big (e.g. 1T). What's worse, if this job performs recover again and again, it can eat up all network bandwidth and do a huge hurt to cluster. So, I proposed that we can cache the checkpoint data locally, and read checkpoint data from local cache as well as we can, we read the data from remote only if we fail locally. The advantage is that if a execution is assigned to the same TaskManager as before, it can save a lot of bandwidth, and obtain a faster recover. Key issues: 1. Cache the checkpoint data on local disk and manage it's create and delete. 2. introduce a HybridStreamStateHandler which try to create a local input stream first, if failed, it then create a remote input stream, it prototype looks like below: {code:java} class HybridStreamHandle { private StreamStateHandle localHandle; private StreamStateHandle remoteHandle; .. public FSDataInputStream openInputStream() throws IOException { FSDataInputStream inputStream = localHandle.openInputStream(); return inputStream != null ? inputStream : remoteHandle.openInputStream(); } . } {code} Solution: There are two kind solutions I can think of. solution1: Backend do the cached job, and the HybridStreamHandle point to both local and remote data, HybridStreamHandle is managed by CheckpointCoordinator as well as other StreamHandle, so CheckpointCoordinator will perform dispose on it. when HybridStreamHandle performs dispose it call localHandle.dispose() and remoteHandle.dispose(). In this way, we have to record TaskManager's info (like location) in localHandle and add an entry in TaskManager to handle localHandle dispose message, we also have to consider the HA situation. solution2: Backend do the cached job and manage the cached data itself. It simple use a TTL-like method to manage handle's dispose, we dispose a handle if it wasn't be touched for a {code}X{code} time. We will touch the handles when we recover from checkpoint or when we performs a checkpoint, once we touch a handle we reset the TTL for it. In this way, all jobs is done by Backend, it transparent to both JobManager and TaskManager. The only problem is that we may dispose a handle that maybe useful, but even in this case, we can read from remote data finally, and users can avoid this by set a proper TTL value according to checkpoint interval and other things. Consider trying not to complicate the problem reasons, i prefer to use the solution2. Can someone give me some advice? I would appreciate it very much~ was: Current recovery strategy will always read checkpoint data from remote FileStream (HDFS). This will cost a lot of network when the state is so big (e.g. 1T), this cost can be saved by reading the checkpoint data from local disk. So i introduce a HybridStreamStateHandler which try to create a local input stream first, if failed, it then create a remote input stream, it prototype looks like below: {code:java} class HybridStreamHandle { private FileStateHandle localHandle; private FileStateHandle remoteHandle; .. public FSDataInputStream openInputStream() throws IOException { FSDataInputStream inputStream = localHandle.openInputStream(); return inputStream != null ? inputStream : remoteHandle.openInputStream(); } . } {code} > Introduce HybridStreamStateHandle for quick recovery from checkpoint. > - > > Key: FLINK-7873 > URL: https://issues.apache.org/jira/browse/FLINK-7873 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 >Reporter: Sihua Zhou >Assignee: Sihua Zhou > > Current recover strategy will always read checkpoint data from remote > FileStream (HDFS). This will cost a lot of bandwidth when the state is so big > (e.g. 1T). What's worse, if this job performs recover again and again, it can > eat up all network bandwidth and do a huge hurt to cluster. So, I proposed > that we can cache the checkpoint data locally, and read checkpoint data from > local cache as well as we can, we read the data from remote only if we fail > locally. The advantage is that if a execution is assigned to the same > TaskManager as before, it can save a lot of bandwidth, and obtain a faster > recover. > Key issues: > 1. Cache the checkpoint data on local disk and manage it's create and delete. > 2. introduce a HybridStreamStateHandler which try to create a local input > stream first, if
[jira] [Commented] (FLINK-7153) Eager Scheduling can't allocate source for ExecutionGraph correctly
[ https://issues.apache.org/jira/browse/FLINK-7153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222717#comment-16222717 ] ASF GitHub Bot commented on FLINK-7153: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4916 [FLINK-7153] Re-introduce preferred locations for scheduling ## What is the purpose of the change This PR makes the `TaskManagerLocation` accessible for asynchronous scheduling. Due to changes for Flink 1.3 where we introduced asynchronous scheduling, it was not always guaranteed that the scheduler knew about the scheduling locations of producer tasks. Especially the eager scheduling mode was affected since the slot allocation happened concurrently. In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to each `Execution`. In eager scheduling mode, a slot will only be requested for a task if all its inputs have a slot assigned (e.g. their `TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't wait for the completion of all inputs, but take those inputs which are already known. In order to distinguish whether we want to wait for all or take all available task manager locations, we add a `LocationPreferenceConstraint` which has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs to have a location assigned, and `ANY` means that we take what's currently known. In order to not deploy slots prematurely in eager mode, the slot assignment has been factored out into its own step. Before, one had to call `Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started the deployment. Now, one has to call `Execution#tryAssignResource` before one can call `Execution#deploy`. Moreover this PR fixes that the `FailoverRegions` are topologically sorted which is important for non queued scheduling. FYI @StephanEwen ## Brief change log - Introduce `LocationPreferenceConstraint` to distinguish the waiting behaviour for the preferred locations - Split slot assignment and deployment into two separate steps - Moved preferred location calculation into the Execution to reduce code duplication between the `Scheduler` and the `SlotPool` - Changed preferred location calculation to be blocking if `LocationPreferenceConstraint#ALL` and not all input locations are known ## Verifying this change This change added tests and can be verified as follows: - Added `ExecutionTest` to check the correct assigned slot release in case of cancellation and to check the correct preferred location calculation - Added `ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations` to check that eager scheduling waits for all inputs to be assigned before scheduling consumer tasks - Moreover, the scheduler is being tested by existing tests such as `SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases for lazy scheduling (batch case) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4916 commit 32eb1812583b84d80091d1a278d53ed663d8a065 Author: TillDate: 2017-10-16T12:04:13Z [FLINK-7153] Re-introduce preferred locations for scheduling commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12 Author: Till Rohrmann Date: 2017-10-27T07:47:03Z [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling The LocationPreferenceConstraint defines whether all or any preferred locations have to be taken into consideration when scheduling tasks. Especially for batch jobs where we do lazy scheduling not all input locations might be known for a consumer task. Therefore, we set the location preference constraint to any which means that only those location are taken into
[GitHub] flink pull request #4916: [FLINK-7153] Re-introduce preferred locations for ...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4916 [FLINK-7153] Re-introduce preferred locations for scheduling ## What is the purpose of the change This PR makes the `TaskManagerLocation` accessible for asynchronous scheduling. Due to changes for Flink 1.3 where we introduced asynchronous scheduling, it was not always guaranteed that the scheduler knew about the scheduling locations of producer tasks. Especially the eager scheduling mode was affected since the slot allocation happened concurrently. In order to fix this problem, this PR adds a `TaskManagerLocationFuture` to each `Execution`. In eager scheduling mode, a slot will only be requested for a task if all its inputs have a slot assigned (e.g. their `TaskManagerLocationFuture` is completed). In lazy scheduling mode, we don't wait for the completion of all inputs, but take those inputs which are already known. In order to distinguish whether we want to wait for all or take all available task manager locations, we add a `LocationPreferenceConstraint` which has the values `ALL` and `ANY`. `ALL` means that we have to wait for all inputs to have a location assigned, and `ANY` means that we take what's currently known. In order to not deploy slots prematurely in eager mode, the slot assignment has been factored out into its own step. Before, one had to call `Execution#deployToSlot(SimpleSlot)` which assigned the given slot and started the deployment. Now, one has to call `Execution#tryAssignResource` before one can call `Execution#deploy`. Moreover this PR fixes that the `FailoverRegions` are topologically sorted which is important for non queued scheduling. FYI @StephanEwen ## Brief change log - Introduce `LocationPreferenceConstraint` to distinguish the waiting behaviour for the preferred locations - Split slot assignment and deployment into two separate steps - Moved preferred location calculation into the Execution to reduce code duplication between the `Scheduler` and the `SlotPool` - Changed preferred location calculation to be blocking if `LocationPreferenceConstraint#ALL` and not all input locations are known ## Verifying this change This change added tests and can be verified as follows: - Added `ExecutionTest` to check the correct assigned slot release in case of cancellation and to check the correct preferred location calculation - Added `ExecutionGraphDeploymentTest#testEagerSchedulingWaitsOnAllInputPreferredLocations` to check that eager scheduling waits for all inputs to be assigned before scheduling consumer tasks - Moreover, the scheduler is being tested by existing tests such as `SchedulerSlotSharingTest`, `ScheduleWithCoLocationHintTest` and many IT cases for lazy scheduling (batch case) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixGroupScheduling2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4916.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4916 commit 32eb1812583b84d80091d1a278d53ed663d8a065 Author: TillDate: 2017-10-16T12:04:13Z [FLINK-7153] Re-introduce preferred locations for scheduling commit 8c0c9aeaa7ca995247f2b9f9e63723e52d839a12 Author: Till Rohrmann Date: 2017-10-27T07:47:03Z [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling The LocationPreferenceConstraint defines whether all or any preferred locations have to be taken into consideration when scheduling tasks. Especially for batch jobs where we do lazy scheduling not all input locations might be known for a consumer task. Therefore, we set the location preference constraint to any which means that only those location are taken into consideration which are known at scheduling time. commit c821e67529deaaed96f183fc22bc0a9fe246fa23 Author: Till Rohrmann Date: 2017-10-26T16:22:43Z [hotfix] Make failover region topological sorted commit
[jira] [Closed] (FLINK-7924) Fix incorrect names of checkpoint options
[ https://issues.apache.org/jira/browse/FLINK-7924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-7924. --- > Fix incorrect names of checkpoint options > - > > Key: FLINK-7924 > URL: https://issues.apache.org/jira/browse/FLINK-7924 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when > actually, > the checkpoints may always be incremental and only savepoints have to be full > and self contained. > Initially, we planned to add options for multiple checkpoints, like > checkpoints > that were foreced to be full, and checkpoints that were incremental. That > is not necessary at this point. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (FLINK-7924) Fix incorrect names of checkpoint options
[ https://issues.apache.org/jira/browse/FLINK-7924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-7924. - Resolution: Fixed Fixed in fe3b276818eec1d4a70632a45343d70dc2be53f3 > Fix incorrect names of checkpoint options > - > > Key: FLINK-7924 > URL: https://issues.apache.org/jira/browse/FLINK-7924 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > > Checkpoint options are incorrectly always called 'FULL_CHECKPOINT' when > actually, > the checkpoints may always be incremental and only savepoints have to be full > and self contained. > Initially, we planned to add options for multiple checkpoints, like > checkpoints > that were foreced to be full, and checkpoints that were incremental. That > is not necessary at this point. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7765) Enable dependency convergence
[ https://issues.apache.org/jira/browse/FLINK-7765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222619#comment-16222619 ] ASF GitHub Bot commented on FLINK-7765: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4777 Looks good - still the open question whether we add dependency convergence by default, and deactivate it in not yet done modules. That gives the completed modules a "done" feeling and king of doubles as a "todo" for the remaining modules. > Enable dependency convergence > - > > Key: FLINK-7765 > URL: https://issues.apache.org/jira/browse/FLINK-7765 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > > For motivation check https://issues.apache.org/jira/browse/FLINK-7739 > SubTasks of this task depends on one another - to enable convergence in > `flink-runtime` it has to be enabled for `flink-shaded-hadoop` first. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4777: [FLINK-7765] Enable dependency convergence
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4777 Looks good - still the open question whether we add dependency convergence by default, and deactivate it in not yet done modules. That gives the completed modules a "done" feeling and king of doubles as a "todo" for the remaining modules. ---
[GitHub] flink issue #4914: [hotfix] [docs] Fix typos in types serialization document...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4914 Good fix, merging... ---
[GitHub] flink issue #4913: [hotfix] [javadoc] Fix typo in Javadoc of ManagedSnapshot...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4913 Thanks, good catch! Merging... ---
[GitHub] flink issue #4912: [hotfix] [docs] Fix broken downloads page url
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4912 Thanks, merging... ---
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222551#comment-16222551 ] ASF GitHub Bot commented on FLINK-7838: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/4915 > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...
Github user GJL commented on the issue: https://github.com/apache/flink/pull/4915 ð ---
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222550#comment-16222550 ] ASF GitHub Bot commented on FLINK-7838: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147442314 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -563,7 +553,7 @@ public void close() throws Exception { asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); } try { - producersPool.close(); + producersPool.ifPresent(pool -> pool.close()); --- End diff -- I am not adamant about it but using `Optional` in private fields is not without controversy: https://stackoverflow.com/a/26328555 Also, `ifPresent(pool -> pool.close()` only works because `close` does not declare any checked exceptions. If it did, the code would not compile. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147442314 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -563,7 +553,7 @@ public void close() throws Exception { asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); } try { - producersPool.close(); + producersPool.ifPresent(pool -> pool.close()); --- End diff -- I am not adamant about it but using `Optional` in private fields is not without controversy: https://stackoverflow.com/a/26328555 Also, `ifPresent(pool -> pool.close()` only works because `close` does not declare any checked exceptions. If it did, the code would not compile. ---
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222522#comment-16222522 ] ASF GitHub Bot commented on FLINK-7838: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147436687 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean logFailuresOnly) { */ @Override public void open(Configuration configuration) throws Exception { - if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { --- End diff -- What is the benefit of moving this into `initializeState()`? > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...
Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/4915#discussion_r147436687 --- Diff: flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java --- @@ -483,11 +478,6 @@ public void setLogFailuresOnly(boolean logFailuresOnly) { */ @Override public void open(Configuration configuration) throws Exception { - if (semantic != Semantic.NONE && !((StreamingRuntimeContext) this.getRuntimeContext()).isCheckpointingEnabled()) { --- End diff -- What is the benefit of moving this into `initializeState()`? ---
[GitHub] flink issue #4894: [FLINK-7548] [table] Improve rowtime support of TableSour...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 Added documentation for the new TableSource interfaces. ---
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222519#comment-16222519 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 Added documentation for the new TableSource interfaces. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7937) Add pagination to Flink History view
Andrew Roberts created FLINK-7937: - Summary: Add pagination to Flink History view Key: FLINK-7937 URL: https://issues.apache.org/jira/browse/FLINK-7937 Project: Flink Issue Type: Improvement Components: History Server, JobManager Reporter: Andrew Roberts We have enough historical jobs that the browser chokes when trying to render them all on one page. The history server should have pagination added, so it's only trying to render some small subset at a time. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222494#comment-16222494 ] ASF GitHub Bot commented on FLINK-7838: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4915 @GJL @aljoscha could you take a look? > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4915: [FLINK-7838] Bunch of hotfixes and fix missing synchroniz...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4915 @GJL @aljoscha could you take a look? ---
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222450#comment-16222450 ] Piotr Nowojski commented on FLINK-7838: --- The main problem in this issue is this bug in Kafka: https://issues.apache.org/jira/browse/KAFKA-6132 [~aljoscha] I'm not sure what we should do now in that case. I tried bumping our connector to work with Kafka 1.0.0-rc3 and this dead lock error was gone, however the upgrade broke the consumer side of our connector. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7838) Kafka011ProducerExactlyOnceITCase do not finish
[ https://issues.apache.org/jira/browse/FLINK-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222448#comment-16222448 ] ASF GitHub Bot commented on FLINK-7838: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4915 [FLINK-7838] Bunch of hotfixes and fix missing synchronization in FlinkKafkaProducer011 ## What is the purpose of the change Most important is the commit adding missing synchronization, that might been the cause for some deadlocks on travis. Others are just non critical hotfixes. ## Brief change log Please check individual commit messages. ## Verifying this change This change is already covered by existing Kafka 0.11 connector tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f7838 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4915 commit 4e0492595f497a49c63b8ffddcc66e720e4e4433 Author: Piotr NowojskiDate: 2017-10-24T15:35:56Z [hotfix][kafka] Bump Kafka 0.11 dependency This might include some bugfixes commit e38b3461bc97a175bf67f1072b2e8a2a891c1f1a Author: Piotr Nowojski Date: 2017-10-24T15:57:05Z [FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer commit 04127b9c44807f5379e07d801847d993c39e94b1 Author: Piotr Nowojski Date: 2017-10-26T08:02:15Z [hotfix][kafka] Fix FlinkKafkaProducer011 logger commit 8b47ac214c4022563be8128e84bc02d5de98819c Author: Piotr Nowojski Date: 2017-10-27T13:11:24Z [hotfix][kafka-tests] Fix test names so that they are not ignored by mvn build commit a6c4c8bbdbfc5c238557e151fa8598e71a562411 Author: Piotr Nowojski Date: 2017-10-25T16:08:46Z [hotfix][kafka] Move checkpointing enable checking to initializeState initializeState is called before open and since both of those functions relay on chosen semantic, that means checkpointing enable check should happen in initializeState. commit 055e5d125df895fd010e1171d1d39f37177518a2 Author: Piotr Nowojski Date: 2017-10-27T13:14:58Z [hotfix][kafka] Remove unsued field in FlinkKafkaProducer011 commit 6cf55ed8977135af01099452962962199e253348 Author: Piotr Nowojski Date: 2017-10-27T13:47:26Z [hotfix][kafka] Do not return producers to a pool in abort for non EXACTLY_ONCE modes Previously on abort(...) producers were returned to the pool. This was minor bug, probably without any negative side effect, however this patch fixes it and adds additional sanity checks to guard against similar bugs in the future. > Kafka011ProducerExactlyOnceITCase do not finish > --- > > Key: FLINK-7838 > URL: https://issues.apache.org/jira/browse/FLINK-7838 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > Attachments: initTransactions_deadlock.txt, log.txt > > > See attached log -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4915: [FLINK-7838] Bunch of hotfixes and fix missing syn...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4915 [FLINK-7838] Bunch of hotfixes and fix missing synchronization in FlinkKafkaProducer011 ## What is the purpose of the change Most important is the commit adding missing synchronization, that might been the cause for some deadlocks on travis. Others are just non critical hotfixes. ## Brief change log Please check individual commit messages. ## Verifying this change This change is already covered by existing Kafka 0.11 connector tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f7838 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4915.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4915 commit 4e0492595f497a49c63b8ffddcc66e720e4e4433 Author: Piotr NowojskiDate: 2017-10-24T15:35:56Z [hotfix][kafka] Bump Kafka 0.11 dependency This might include some bugfixes commit e38b3461bc97a175bf67f1072b2e8a2a891c1f1a Author: Piotr Nowojski Date: 2017-10-24T15:57:05Z [FLINK-7838][kafka] Add missing synchronization in FlinkKafkaProducer commit 04127b9c44807f5379e07d801847d993c39e94b1 Author: Piotr Nowojski Date: 2017-10-26T08:02:15Z [hotfix][kafka] Fix FlinkKafkaProducer011 logger commit 8b47ac214c4022563be8128e84bc02d5de98819c Author: Piotr Nowojski Date: 2017-10-27T13:11:24Z [hotfix][kafka-tests] Fix test names so that they are not ignored by mvn build commit a6c4c8bbdbfc5c238557e151fa8598e71a562411 Author: Piotr Nowojski Date: 2017-10-25T16:08:46Z [hotfix][kafka] Move checkpointing enable checking to initializeState initializeState is called before open and since both of those functions relay on chosen semantic, that means checkpointing enable check should happen in initializeState. commit 055e5d125df895fd010e1171d1d39f37177518a2 Author: Piotr Nowojski Date: 2017-10-27T13:14:58Z [hotfix][kafka] Remove unsued field in FlinkKafkaProducer011 commit 6cf55ed8977135af01099452962962199e253348 Author: Piotr Nowojski Date: 2017-10-27T13:47:26Z [hotfix][kafka] Do not return producers to a pool in abort for non EXACTLY_ONCE modes Previously on abort(...) producers were returned to the pool. This was minor bug, probably without any negative side effect, however this patch fixes it and adds additional sanity checks to guard against similar bugs in the future. ---
[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss
[ https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222429#comment-16222429 ] ASF GitHub Bot commented on FLINK-7737: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4876 Yes I know what you mean. However including `outStream` to `hashCode` and `equals` wouldn't add any quirks. > On HCFS systems, FSDataOutputStream does not issue hsync only hflush which > leads to data loss > - > > Key: FLINK-7737 > URL: https://issues.apache.org/jira/browse/FLINK-7737 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.3.2 > Environment: Dev >Reporter: Ryan Hobbs >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > During several tests where we simulated failure conditions, we have observed > that on HCFS systems where the data stream is of type FSDataOutputStream, > Flink will issue hflush() and not hsync() which results in data loss. > In the class *StreamWriterBase.java* the code below will execute hsync if the > output stream is of type *HdfsDataOutputStream* but not for streams of type > *FSDataOutputStream*. Is this by design? > {code} > protected void hflushOrSync(FSDataOutputStream os) throws IOException { > try { > // At this point the refHflushOrSync cannot be null, > // since register method would have thrown if it was. > this.refHflushOrSync.invoke(os); > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } > } catch (InvocationTargetException e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e.getCause()); > Throwable cause = e.getCause(); > if (cause != null && cause instanceof IOException) { > throw (IOException) cause; > } > throw new RuntimeException(msg, e); > } catch (Exception e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e); > throw new RuntimeException(msg, e); > } > } > {code} > Could a potential fix me to perform a sync even on streams of type > *FSDataOutputStream*? > {code} > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } else if (os instanceof FSDataOutputStream) { > os.hsync(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4876: [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWr...
Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4876 Yes I know what you mean. However including `outStream` to `hashCode` and `equals` wouldn't add any quirks. ---
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222427#comment-16222427 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4872 This PR does not strictly depend on the `MetricFetcher` changes. The reason why they are is simply that I directly wanted to adapt the `MetricFetcher` which I touched with #4852. It just turned out later that I would be fixing FLINK-7100 with this PR as well. I actually think that #4852 can also be merged into the release branch. However, if you insist, then I can try to disentangle this PR from the `MetricFetcher` changes. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4872: [FLINK-7876] Register TaskManagerMetricGroup under Resour...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4872 This PR does not strictly depend on the `MetricFetcher` changes. The reason why they are is simply that I directly wanted to adapt the `MetricFetcher` which I touched with #4852. It just turned out later that I would be fixing FLINK-7100 with this PR as well. I actually think that #4852 can also be merged into the release branch. However, if you insist, then I can try to disentangle this PR from the `MetricFetcher` changes. ---
[jira] [Assigned] (FLINK-7051) Bump up Calcite version to 1.14
[ https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-7051: Assignee: Fabian Hueske (was: Haohui Mai) > Bump up Calcite version to 1.14 > --- > > Key: FLINK-7051 > URL: https://issues.apache.org/jira/browse/FLINK-7051 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Fabian Hueske >Priority: Critical > Fix For: 1.4.0 > > > This is an umbrella issue for all tasks that need to be done once Apache > Calcite 1.14 is released. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7051) Bump up Calcite version to 1.14
[ https://issues.apache.org/jira/browse/FLINK-7051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-7051: Assignee: Haohui Mai (was: Fabian Hueske) > Bump up Calcite version to 1.14 > --- > > Key: FLINK-7051 > URL: https://issues.apache.org/jira/browse/FLINK-7051 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Timo Walther >Assignee: Haohui Mai >Priority: Critical > Fix For: 1.4.0 > > > This is an umbrella issue for all tasks that need to be done once Apache > Calcite 1.14 is released. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222399#comment-16222399 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r147416486 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java --- @@ -239,7 +239,15 @@ public void shutdown() { if (queryService != null) { stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); - stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + + try { + stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + } catch (IllegalStateException ignored) { + // this can happen if the underlying actor system has been stopped before shutting + // the metric registry down + // TODO: Pull the MetricQueryService actor out of the MetricRegistry + LOG.debug("Cannot gracefully stop the metric query service actor."); --- End diff -- Giving it a second look, I deliberately did not include the exception, because it can only happen if the `ActorSystem` has been shut down before. I'll change the debug log message instead. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r147416486 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java --- @@ -239,7 +239,15 @@ public void shutdown() { if (queryService != null) { stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); - stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + + try { + stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + } catch (IllegalStateException ignored) { + // this can happen if the underlying actor system has been stopped before shutting + // the metric registry down + // TODO: Pull the MetricQueryService actor out of the MetricRegistry + LOG.debug("Cannot gracefully stop the metric query service actor."); --- End diff -- Giving it a second look, I deliberately did not include the exception, because it can only happen if the `ActorSystem` has been shut down before. I'll change the debug log message instead. ---
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222395#comment-16222395 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r147415854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java --- @@ -239,7 +239,15 @@ public void shutdown() { if (queryService != null) { stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); - stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + + try { + stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + } catch (IllegalStateException ignored) { + // this can happen if the underlying actor system has been stopped before shutting + // the metric registry down + // TODO: Pull the MetricQueryService actor out of the MetricRegistry + LOG.debug("Cannot gracefully stop the metric query service actor."); --- End diff -- good point > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r147415854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java --- @@ -239,7 +239,15 @@ public void shutdown() { if (queryService != null) { stopTimeout = new FiniteDuration(1L, TimeUnit.SECONDS); - stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + + try { + stopFuture = Patterns.gracefulStop(queryService, stopTimeout); + } catch (IllegalStateException ignored) { + // this can happen if the underlying actor system has been stopped before shutting + // the metric registry down + // TODO: Pull the MetricQueryService actor out of the MetricRegistry + LOG.debug("Cannot gracefully stop the metric query service actor."); --- End diff -- good point ---
[jira] [Commented] (FLINK-7876) Register TaskManagerMetricGroup under ResourceID instead of InstanceID
[ https://issues.apache.org/jira/browse/FLINK-7876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222394#comment-16222394 ] ASF GitHub Bot commented on FLINK-7876: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r147415810 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java --- @@ -113,7 +113,7 @@ protected void startClusterComponents(Configuration configuration, RpcService rp RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, --- End diff -- You're right. I'll go over the different places again and try to fix it. > Register TaskManagerMetricGroup under ResourceID instead of InstanceID > -- > > Key: FLINK-7876 > URL: https://issues.apache.org/jira/browse/FLINK-7876 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > Currently, the {{TaskManager}} registers the {{TaskManagerMetricGroup}} under > its {{InstanceID}} and thereby binding its metrics effectively to the > lifetime of its registration with the {{JobManager}}. This has also > implications how the REST handler retrieve the TaskManager metrics, namely by > its {{InstanceID}}. > I would actually propose to register the {{TaskManagerMetricGroup}} under the > {{TaskManager}}/{{TaskExecutor}} {{ResourceID}} which is valid over the whole > lifetime of the {{TaskManager}}/{{TaskExecutor}}. That way we would also be > able to query metrics independent of the connection status to the > {{JobManager}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4872: [FLINK-7876] Register TaskManagerMetricGroup under...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4872#discussion_r147415810 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/entrypoint/MesosSessionClusterEntrypoint.java --- @@ -113,7 +113,7 @@ protected void startClusterComponents(Configuration configuration, RpcService rp RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, - MetricRegistry metricRegistry, + MetricRegistryImpl metricRegistry, --- End diff -- You're right. I'll go over the different places again and try to fix it. ---
[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure
[ https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222388#comment-16222388 ] ASF GitHub Bot commented on FLINK-7844: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4844 Thanks for the review @StephanEwen. I will address your comments and then merge this PR. > Fine Grained Recovery triggers checkpoint timeout failure > - > > Key: FLINK-7844 > URL: https://issues.apache.org/jira/browse/FLINK-7844 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > Attachments: screenshot-1.png > > > Context: > We are using "individual" failover (fine-grained) recovery strategy for our > embarrassingly parallel router use case. The topic has over 2000 partitions, > and parallelism is set to ~180 that dispatched to over 20 task managers with > around 180 slots. > Observations: > We've noticed after one task manager termination, even though the individual > recovery happens correctly, that the workload was re-dispatched to a new > available task manager instance. However, the checkpoint would take 10 mins > to eventually timeout, causing all other task managers not able to commit > checkpoints. In a worst-case scenario, if job got restarted for other reasons > (i.e. job manager termination), that would cause more messages to be > re-processed/duplicates compared to the job without fine-grained recovery > enabled. > I am suspecting that uber checkpoint was waiting for a previous checkpoint > that initiated by the old task manager and thus taking a long time to time > out. > Two questions: > 1. Is there a configuration that controls this checkpoint timeout? > 2. Is there any reason that when Job Manager realizes that Task Manager is > gone and workload is redispatched, it still need to wait for the checkpoint > initiated by the old task manager? > Checkpoint screenshot in attachments. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7844) Fine Grained Recovery triggers checkpoint timeout failure
[ https://issues.apache.org/jira/browse/FLINK-7844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222387#comment-16222387 ] ASF GitHub Bot commented on FLINK-7844: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4844#discussion_r147415027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1270,6 +1272,42 @@ public void run() { } /** +* Discards the given pending checkpoint because of the given cause. +* +* @param pendingCheckpoint to discard +* @param cause for discarding the checkpoint +*/ + private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable cause) { + Thread.holdsLock(lock); --- End diff -- Yes definitely. > Fine Grained Recovery triggers checkpoint timeout failure > - > > Key: FLINK-7844 > URL: https://issues.apache.org/jira/browse/FLINK-7844 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.4.0, 1.3.2 >Reporter: Zhenzhong Xu >Assignee: Zhenzhong Xu > Attachments: screenshot-1.png > > > Context: > We are using "individual" failover (fine-grained) recovery strategy for our > embarrassingly parallel router use case. The topic has over 2000 partitions, > and parallelism is set to ~180 that dispatched to over 20 task managers with > around 180 slots. > Observations: > We've noticed after one task manager termination, even though the individual > recovery happens correctly, that the workload was re-dispatched to a new > available task manager instance. However, the checkpoint would take 10 mins > to eventually timeout, causing all other task managers not able to commit > checkpoints. In a worst-case scenario, if job got restarted for other reasons > (i.e. job manager termination), that would cause more messages to be > re-processed/duplicates compared to the job without fine-grained recovery > enabled. > I am suspecting that uber checkpoint was waiting for a previous checkpoint > that initiated by the old task manager and thus taking a long time to time > out. > Two questions: > 1. Is there a configuration that controls this checkpoint timeout? > 2. Is there any reason that when Job Manager realizes that Task Manager is > gone and workload is redispatched, it still need to wait for the checkpoint > initiated by the old task manager? > Checkpoint screenshot in attachments. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending checkpoin...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4844 Thanks for the review @StephanEwen. I will address your comments and then merge this PR. ---
[GitHub] flink pull request #4844: [FLINK-7844] [ckPt] Fail unacknowledged pending ch...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4844#discussion_r147415027 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -1270,6 +1272,42 @@ public void run() { } /** +* Discards the given pending checkpoint because of the given cause. +* +* @param pendingCheckpoint to discard +* @param cause for discarding the checkpoint +*/ + private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable cause) { + Thread.holdsLock(lock); --- End diff -- Yes definitely. ---
[jira] [Commented] (FLINK-7705) Port JobDetailsHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222384#comment-16222384 ] ASF GitHub Bot commented on FLINK-7705: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4884 Hi @zjureel, I'll merge this PR once we forked off the 1.4 release branch. > Port JobDetailsHandler to new REST endpoint > --- > > Key: FLINK-7705 > URL: https://issues.apache.org/jira/browse/FLINK-7705 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > Port existing {{JobDetailsHandler}} to new REST endpoint. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4884: [FLINK-7705] Add JobDetailsHandler
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4884 Hi @zjureel, I'll merge this PR once we forked off the 1.4 release branch. ---
[jira] [Commented] (FLINK-7800) Enable window joins without equi-join predicates
[ https://issues.apache.org/jira/browse/FLINK-7800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222325#comment-16222325 ] Xingcan Cui commented on FLINK-7800: Hi [~fhueske], the problem is not as easy as I expected. The key point is that once we remove the equi-predicate check in {{FlinkLogicalJoin}}, there will be different query plans in the optimization phase. For instance, given the following test expressions: {code} val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g) {code} two kinds of plans will become the candidates: with or without equi-predicates in {{LogicalJoin}}. Worse still, the plans without equi-predicates may have a lower cost (mainly in terms of IO for the DataSet join), thus be selected as the result. To solve this, we must propose a mechanism to ensure that the plans with equi-predicates should always be preferred, regardless of their costs. Maybe that can be implemented by adding a "punishment factor" to plans without equi-predicates (or "enhancement factor" vice versa), but I am concerned whether this may break the existing cost model. Do you have some ideas about that? Best, Xingcan > Enable window joins without equi-join predicates > > > Key: FLINK-7800 > URL: https://issues.apache.org/jira/browse/FLINK-7800 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: Fabian Hueske >Assignee: Xingcan Cui > > Currently, windowed joins can only be translated if they have at least on > equi-join predicate. This limitation exists due to the lack of a good cross > join strategy for the DataSet API. > Due to the window, windowed joins do not have to be executed as cross joins. > Hence, the equi-join limitation does not need to be enforces (even though > non-equi joins are executed with a parallelism of 1 right now). > We can resolve this issue by adding a boolean flag to the > {{FlinkLogicalJoinConverter}} rule to permit non-equi joins and add such a > rule to the logical optimization set of the DataStream API. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4914: [hotfix] [docs] Fix typos in types serialization d...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/4914 [hotfix] [docs] Fix typos in types serialization documentation This fixes typos in the types serialization documentation. You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink hotfix-docs-types-serilization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4914.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4914 commit 7e7e5defc7e52ede0c3331a5d30e9228b1f87eaa Author: gyaoDate: 2017-10-27T12:55:39Z [hotfix] [docs] Fix typos in types serialization documentation ---
[GitHub] flink pull request #4913: [hotfix] [javadoc] Fix typo in Javadoc of ManagedS...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/4913 [hotfix] [javadoc] Fix typo in Javadoc of ManagedSnapshotContext#getCheckpointId() This fixes a typo in the Javadoc of ManagedSnapshotContext#getCheckpointId(). You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink hotfix-javadoc-ManagedSnapshotContext Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4913.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4913 commit 26aca12920d741aafff01fec3d1ebd9c50177fd2 Author: gyaoDate: 2017-10-27T12:48:21Z [hotfix] [javadoc] Fix typo in Javadoc of ManagedSnapshotContext#getCheckpointId() ---
[GitHub] flink pull request #4912: [hotfix] [docs] Fix broken downloads page url
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/4912 [hotfix] [docs] Fix broken downloads page url This fixes a broken *Downloads Page* url on the AWS page of the documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/aws.html You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink hotfix-docs-aws-downloads-page Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4912.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4912 commit 7369dd69e0fa416b86ce430826b65edf36f67815 Author: gyaoDate: 2017-10-27T12:44:32Z [hotfix] [docs] Fix broken downloads page url ---
[jira] [Closed] (FLINK-6173) flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414
[ https://issues.apache.org/jira/browse/FLINK-6173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske closed FLINK-6173. Resolution: Fixed Fix Version/s: 1.3.3 Fixed for 1.3.3 with c3289c9d982292975caded4c45926e9653ce5b63 Fixed for 1.4.0 with 6f83b4131662353ade632af3cc1c479793b33866 > flink-table not pack-in com.fasterxml.jackson.* in after #FLINK-5414 > > > Key: FLINK-6173 > URL: https://issues.apache.org/jira/browse/FLINK-6173 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Zhenghua Gao >Assignee: Timo Walther >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Currently, flink-table will pack-in com.fasterxml.jackson.* and rename them > to org.apache.flink.shaded.calcite.com.fasterxml.jackson.* > If a project depends on flink-table, and uses fasterxml as follows(function > explain uses fasterxml indirectly): > {code:title=WordCount.scala|borderStyle=solid} > object WordCountWithTable { > def main(args: Array[String]): Unit = { > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", > 1)) > val expr = input.toTable(tEnv) > val result = expr > .groupBy('word) > .select('word, 'frequency.sum as 'frequency) > .filter('frequency === 2) > println(tEnv.explain(result)) > result.toDataSet[WC].print() > } > case class WC(word: String, frequency: Long) > } > {code} > It actually uses org.apache.flink.shaded.calcite.com.fasterxml.jackson.* > I found after FLINK-5414, flink-table didn't pack-in com.fasterxml.jackson.* > and the project would throw class not found exception. > {code:borderStyle=solid} > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/flink/shaded/calcite/com/fasterxml/jackson/databind/ObjectMapper > at > org.apache.flink.table.explain.PlanJsonParser.getSqlExecutionPlan(PlanJsonParser.java:32) > at > org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:143) > at > org.apache.flink.table.api.BatchTableEnvironment.explain(BatchTableEnvironment.scala:164) > at > org.apache.flink.quickstart.WordCountWithTable$.main(WordCountWithTable.scala:34) > at > org.apache.flink.quickstart.WordCountWithTable.main(WordCountWithTable.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.shaded.calcite.com.fasterxml.jackson.databind.ObjectMapper > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 10 more > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss
[ https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1609#comment-1609 ] ASF GitHub Bot commented on FLINK-7737: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4876 But do you understand what I mean? Semantics of code in the main scope should not be quirked to make assertions in tests shorter to write. Equals/hashCode is usually not implemented on I/O classes, like the output stream, because it is not well defined. > On HCFS systems, FSDataOutputStream does not issue hsync only hflush which > leads to data loss > - > > Key: FLINK-7737 > URL: https://issues.apache.org/jira/browse/FLINK-7737 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.3.2 > Environment: Dev >Reporter: Ryan Hobbs >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > During several tests where we simulated failure conditions, we have observed > that on HCFS systems where the data stream is of type FSDataOutputStream, > Flink will issue hflush() and not hsync() which results in data loss. > In the class *StreamWriterBase.java* the code below will execute hsync if the > output stream is of type *HdfsDataOutputStream* but not for streams of type > *FSDataOutputStream*. Is this by design? > {code} > protected void hflushOrSync(FSDataOutputStream os) throws IOException { > try { > // At this point the refHflushOrSync cannot be null, > // since register method would have thrown if it was. > this.refHflushOrSync.invoke(os); > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } > } catch (InvocationTargetException e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e.getCause()); > Throwable cause = e.getCause(); > if (cause != null && cause instanceof IOException) { > throw (IOException) cause; > } > throw new RuntimeException(msg, e); > } catch (Exception e) { > String msg = "Error while trying to hflushOrSync!"; > LOG.error(msg + " " + e); > throw new RuntimeException(msg, e); > } > } > {code} > Could a potential fix me to perform a sync even on streams of type > *FSDataOutputStream*? > {code} > if (os instanceof HdfsDataOutputStream) { > ((HdfsDataOutputStream) > os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); > } else if (os instanceof FSDataOutputStream) { > os.hsync(); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4876: [FLINK-7737][filesystem] Add syncOnFlush flag to StreamWr...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4876 But do you understand what I mean? Semantics of code in the main scope should not be quirked to make assertions in tests shorter to write. Equals/hashCode is usually not implemented on I/O classes, like the output stream, because it is not well defined. ---
[jira] [Assigned] (FLINK-7871) SlotPool should release its unused slot to RM
[ https://issues.apache.org/jira/browse/FLINK-7871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] shuai.xu reassigned FLINK-7871: --- Assignee: shuai.xu > SlotPool should release its unused slot to RM > - > > Key: FLINK-7871 > URL: https://issues.apache.org/jira/browse/FLINK-7871 > Project: Flink > Issue Type: Bug >Reporter: shuai.xu >Assignee: shuai.xu > > As described in design wiki > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65147077, > _*The SlotPool releases slots that are unused to the ResourceManager. Slots > count as unused if they are not used when the job is fully running (fully > recovered).*_ > but now, the slot pool will keep the slots once offered to it until the job > finished. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest
[ https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222145#comment-16222145 ] ASF GitHub Bot commented on FLINK-7933: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147379207 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; --- End diff -- yup we could do that too. > Test instability PrometheusReporterTest > --- > > Key: FLINK-7933 > URL: https://issues.apache.org/jira/browse/FLINK-7933 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest
[ https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222146#comment-16222146 ] ASF GitHub Bot commented on FLINK-7933: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147379229 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); + PrometheusReporter reporter = (PrometheusReporter) registry.getReporters().get(0); + port = reporter.getPort(); + + TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); + TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); + taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); + taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + } + + @After + public void shutdownRegistry() { + if (registry != null) { + registry.shutdown(); --- End diff -- the registry is closing the reporter. > Test instability PrometheusReporterTest > --- > > Key: FLINK-7933 > URL: https://issues.apache.org/jira/browse/FLINK-7933 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147379229 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); + PrometheusReporter reporter = (PrometheusReporter) registry.getReporters().get(0); + port = reporter.getPort(); + + TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); + TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); + taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); + taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + } + + @After + public void shutdownRegistry() { + if (registry != null) { + registry.shutdown(); --- End diff -- the registry is closing the reporter. ---
[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147379207 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; --- End diff -- yup we could do that too. ---
[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222143#comment-16222143 ] ASF GitHub Bot commented on FLINK-7878: --- Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 I run the failed test on my machine and it pass, and it seems my changes will not influence it. > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...
Github user shuai-xu commented on the issue: https://github.com/apache/flink/pull/4911 I run the failed test on my machine and it pass, and it seems my changes will not influence it. ---
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16222013#comment-16222013 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 Thanks for the feedback @twalthr. I pushed an update. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4894: [FLINK-7548] [table] Improve rowtime support of TableSour...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/4894 Thanks for the feedback @twalthr. I pushed an update. ---
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221986#comment-16221986 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147358782 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -84,6 +86,13 @@ return typeInfo; } + @Override + public TableSchema getTableSchema() { + return new TableSchema( + ((RowTypeInfo) typeInfo).getFieldNames(), --- End diff -- What do you think about adding a `fromTypeInfo` method to the companion object that creates a `TableSchema`? > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4894: [FLINK-7548] [table] Improve rowtime support of Ta...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147358782 --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java --- @@ -84,6 +86,13 @@ return typeInfo; } + @Override + public TableSchema getTableSchema() { + return new TableSchema( + ((RowTypeInfo) typeInfo).getFieldNames(), --- End diff -- What do you think about adding a `fromTypeInfo` method to the companion object that creates a `TableSchema`? ---
[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357330 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java --- @@ -70,9 +69,25 @@ @Rule public ExpectedException thrown = ExpectedException.none(); - private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "" + NON_DEFAULT_PORT))); - private final FrontMetricGroup metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); - private final MetricReporter reporter = registry.getReporters().get(0); + private MetricRegistry registry; + private FrontMetricGroup metricGroup; + private PrometheusReporter reporter; + private int port; --- End diff -- do we need this extra field for that? Can't we access `reporter.getPort()`? Isn't it redundant? ---
[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest
[ https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221979#comment-16221979 ] ASF GitHub Bot commented on FLINK-7933: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357745 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); + PrometheusReporter reporter = (PrometheusReporter) registry.getReporters().get(0); + port = reporter.getPort(); + + TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); + TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); + taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); + taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + } + + @After + public void shutdownRegistry() { + if (registry != null) { + registry.shutdown(); --- End diff -- you are not closing the reporter here. Isn't this a root cause of the test instability? > Test instability PrometheusReporterTest > --- > > Key: FLINK-7933 > URL: https://issues.apache.org/jira/browse/FLINK-7933 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357533 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; --- End diff -- can't we access `getPort()` instead of using this variable? Isn't it redundant? ---
[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest
[ https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221978#comment-16221978 ] ASF GitHub Bot commented on FLINK-7933: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357050 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); --- End diff -- Can not we bind to a random port (`0`) and later call `getPort()` as you do it now? > Test instability PrometheusReporterTest > --- > > Key: FLINK-7933 > URL: https://issues.apache.org/jira/browse/FLINK-7933 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest
[ https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221977#comment-16221977 ] ASF GitHub Bot commented on FLINK-7933: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357330 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTest.java --- @@ -70,9 +69,25 @@ @Rule public ExpectedException thrown = ExpectedException.none(); - private final MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "" + NON_DEFAULT_PORT))); - private final FrontMetricGroup metricGroup = new FrontMetricGroup<>(0, new TaskManagerMetricGroup(registry, HOST_NAME, TASK_MANAGER)); - private final MetricReporter reporter = registry.getReporters().get(0); + private MetricRegistry registry; + private FrontMetricGroup metricGroup; + private PrometheusReporter reporter; + private int port; --- End diff -- do we need this extra field for that? Can't we access `reporter.getPort()`? Isn't it redundant? > Test instability PrometheusReporterTest > --- > > Key: FLINK-7933 > URL: https://issues.apache.org/jira/browse/FLINK-7933 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357745 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); + PrometheusReporter reporter = (PrometheusReporter) registry.getReporters().get(0); + port = reporter.getPort(); + + TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); + TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); + taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); + taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + } + + @After + public void shutdownRegistry() { + if (registry != null) { + registry.shutdown(); --- End diff -- you are not closing the reporter here. Isn't this a root cause of the test instability? ---
[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest
[ https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221980#comment-16221980 ] ASF GitHub Bot commented on FLINK-7933: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357533 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; --- End diff -- can't we access `getPort()` instead of using this variable? Isn't it redundant? > Test instability PrometheusReporterTest > --- > > Key: FLINK-7933 > URL: https://issues.apache.org/jira/browse/FLINK-7933 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4908: [FLINK-7933][metrics] Improve PrometheusReporter t...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4908#discussion_r147357050 --- Diff: flink-metrics/flink-metrics-prometheus/src/test/java/org/apache/flink/metrics/prometheus/PrometheusReporterTaskScopeTest.java --- @@ -72,10 +72,30 @@ private final AbstractID taskAttemptId2 = new AbstractID(); private final String[] labelValues2 = {jobId.toString(), taskId2.toString(), taskAttemptId2.toString(), TASK_MANAGER_HOST, TASK_NAME, "" + ATTEMPT_NUMBER, JOB_NAME, TASK_MANAGER_ID, "" + SUBTASK_INDEX_2}; - private final TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID); - private final TaskManagerJobMetricGroup tmJobMetricGroup = new TaskManagerJobMetricGroup(registry, tmMetricGroup, jobId, JOB_NAME); - private final TaskMetricGroup taskMetricGroup1 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId1, taskAttemptId1, TASK_NAME, SUBTASK_INDEX_1, ATTEMPT_NUMBER); - private final TaskMetricGroup taskMetricGroup2 = new TaskMetricGroup(registry, tmJobMetricGroup, taskId2, taskAttemptId2, TASK_NAME, SUBTASK_INDEX_2, ATTEMPT_NUMBER); + private TaskMetricGroup taskMetricGroup1; + private TaskMetricGroup taskMetricGroup2; + + private MetricRegistry registry; + private int port; + + @Before + public void setupReporter() { + registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500"))); --- End diff -- Can not we bind to a random port (`0`) and later call `getPort()` as you do it now? ---
[jira] [Commented] (FLINK-7933) Test instability PrometheusReporterTest
[ https://issues.apache.org/jira/browse/FLINK-7933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221963#comment-16221963 ] ASF GitHub Bot commented on FLINK-7933: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4908 Thanks, looks good, +1 to merge > Test instability PrometheusReporterTest > --- > > Key: FLINK-7933 > URL: https://issues.apache.org/jira/browse/FLINK-7933 > Project: Flink > Issue Type: Bug > Components: Metrics, Tests >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Chesnay Schepler >Priority: Critical > Labels: test-stability > > Travis log: https://travis-ci.org/kl0u/flink/jobs/293220196 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4908: [FLINK-7933][metrics] Improve PrometheusReporter tests
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/4908 Thanks, looks good, +1 to merge ---
[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec
[ https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221940#comment-16221940 ] ASF GitHub Bot commented on FLINK-7878: --- GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4911 [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resource type extendible in the ResourceSpec. Add a extend field for new resources. ## What is the purpose of the change This pull request adds a extensible filed to the ResourceSpec, so user can define variable resources only if supported by their resource manager. *(for example:)* user can use _text.flatMap().setResource(new ResourceSpce(1, 100, new ResourceSpce.Resource("GPU", 0.5)));_ to define their GPU requirement for the operator. ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests ResourceSpecTest to verify. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-7878 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4911.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4911 commit 3e1d61a33f18b351424d4684cbaebc22674f582c Author: shuai.xusDate: 2017-10-25T06:56:35Z [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resouce type extendible in the ResourceSpec. Add a extend field for new resources. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D327427 > Extend the resource type user can define in ResourceSpec > > > Key: FLINK-7878 > URL: https://issues.apache.org/jira/browse/FLINK-7878 > Project: Flink > Issue Type: Bug > Components: DataSet API, DataStream API >Reporter: shuai.xu >Assignee: shuai.xu > Labels: flip-6 > > Now, flink only support user define how much CPU and MEM used in an operator, > but now the resource in a cluster is various. For example, an application for > image processing may need GPU, some others may need FPGA. > Only CPU and MEM is not enough, and the resource type is becoming more and > more, so we need to make the ResourSpec extendible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4911: [FLINK-7878] [api] make resource type extendible i...
GitHub user shuai-xu opened a pull request: https://github.com/apache/flink/pull/4911 [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resource type extendible in the ResourceSpec. Add a extend field for new resources. ## What is the purpose of the change This pull request adds a extensible filed to the ResourceSpec, so user can define variable resources only if supported by their resource manager. *(for example:)* user can use _text.flatMap().setResource(new ResourceSpce(1, 100, new ResourceSpce.Resource("GPU", 0.5)));_ to define their GPU requirement for the operator. ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests ResourceSpecTest to verify. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/shuai-xu/flink jira-7878 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4911.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4911 commit 3e1d61a33f18b351424d4684cbaebc22674f582c Author: shuai.xusDate: 2017-10-25T06:56:35Z [FLINK-7878] [api] make resource type extendible in ResourceSpec Summary: Now, flink only support user define CPU and MEM, but some user need to specify the GPU, FPGA and so on resources. So it need to make the resouce type extendible in the ResourceSpec. Add a extend field for new resources. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D327427 ---
[jira] [Commented] (FLINK-7548) Support watermark generation for TableSource
[ https://issues.apache.org/jira/browse/FLINK-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16221869#comment-16221869 ] ASF GitHub Bot commented on FLINK-7548: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4894#discussion_r147339907 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala --- @@ -18,55 +18,7 @@ package org.apache.flink.table.plan.nodes -import org.apache.flink.api.common.functions.Function -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.{FunctionCodeGenerator, GeneratedFunction} -import org.apache.flink.types.Row - /** * Common class for batch and stream scans. */ -trait CommonScan[T] { - - /** -* We check if the input type is exactly the same as the internal row type. -* A conversion is necessary if types differ. -*/ - private[flink] def needsConversion( - externalTypeInfo: TypeInformation[Any], - internalTypeInfo: TypeInformation[T]): Boolean = -externalTypeInfo != internalTypeInfo - - private[flink] def generatedConversionFunction[F <: Function]( - config: TableConfig, - functionClass: Class[F], - inputType: TypeInformation[Any], - expectedType: TypeInformation[Row], - conversionOperatorName: String, - fieldNames: Seq[String], - inputFieldMapping: Option[Array[Int]] = None) -: GeneratedFunction[F, Row] = { - -val generator = new FunctionCodeGenerator( - config, - false, - inputType, - None, - inputFieldMapping) -val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) - -val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - -generator.generateFunction( - conversionOperatorName, - functionClass, - body, - expectedType) - } - -} +trait CommonScan[T] --- End diff -- I kept it for consistency. The other operators have a CommonX trait as well. > Support watermark generation for TableSource > > > Key: FLINK-7548 > URL: https://issues.apache.org/jira/browse/FLINK-7548 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Fabian Hueske >Priority: Blocker > Fix For: 1.4.0 > > > As discussed in FLINK-7446, currently the TableSource only support to define > rowtime field, but not support to extract watermarks from the rowtime field. > We can provide a new interface called {{DefinedWatermark}}, which has two > methods {{getRowtimeAttribute}} (can only be an existing field) and > {{getWatermarkGenerator}}. The {{DefinedRowtimeAttribute}} will be marked > deprecated. > How to support periodic and punctuated watermarks and support some built-in > strategies needs further discussion. -- This message was sent by Atlassian JIRA (v6.4.14#64029)