[jira] [Commented] (FLINK-11186) Event-time balancing for multiple Kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-11186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725627#comment-16725627 ] Paul Lin commented on FLINK-11186: -- [~tschamberger] Your reasoning is right. For the discussion about state sharing between subtasks please refer to [http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html] . > Event-time balancing for multiple Kafka partitions > -- > > Key: FLINK-11186 > URL: https://issues.apache.org/jira/browse/FLINK-11186 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Kafka Connector >Reporter: Tom Schamberger >Priority: Major > > Currently, it is not possible with Flink to back-pressure individual Kafka > partitions, which are faster in terms of event-time. This leads to > unnecessary memory consumption and can lead to deadlocks in the case of > back-pressure. > When multiple Kafka topics are consumed, succeeding event-time window > operators have to wait until the last Kafka partition has produced a > sufficient watermark to be triggered. If individual Kafka partitions differ > in read performance or the event-time of messages within partitions is not > monotonically distributed, this can lead to a situation, where 'fast' > partitions (event-time makes fast progress) outperform slower partitions > until back-pressuring prevents all partitions from being further consumed. > This leads to a deadlock of the application. > I suggest, that windows should be able to back-pressure individual > partitions, which progress faster in terms of event-time, so that slow > partitions can keep up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-11174) flink Metrics Prometheus labels support chinese
[ https://issues.apache.org/jira/browse/FLINK-11174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725618#comment-16725618 ] lamber-ken edited comment on FLINK-11174 at 12/20/18 6:38 AM: -- hi, [~fanweiwen], you can follow [https://flink.apache.org/how-to-contribute.html] to submit your pr about this issue. this is a good idea from my side. was (Author: lamber-ken): hi, [~fanweiwen], you can follow [https://flink.apache.org/how-to-contribute.html] to submit your pr about this issue > flink Metrics Prometheus labels support chinese > --- > > Key: FLINK-11174 > URL: https://issues.apache.org/jira/browse/FLINK-11174 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.6.2, 1.7.0 >Reporter: Fan weiwen >Assignee: TisonKun >Priority: Minor > Labels: pull-request-available > Attachments: image-2018-12-17-17-17-05-965.png > > > use flink metrics and Prometheus > my job name is chinese > but org.apache.flink.metrics.prometheus.AbstractPrometheusReporter > replaceInvalidChars only support [a-zA-Z0-9:_] > so my job name is replaceAll > > i think labels key is [a-zA-Z0-9:_] ok > but labels value can support chinese? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11174) flink Metrics Prometheus labels support chinese
[ https://issues.apache.org/jira/browse/FLINK-11174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725618#comment-16725618 ] lamber-ken commented on FLINK-11174: hi, [~fanweiwen], you can follow [https://flink.apache.org/how-to-contribute.html] to submit your pr about this issue > flink Metrics Prometheus labels support chinese > --- > > Key: FLINK-11174 > URL: https://issues.apache.org/jira/browse/FLINK-11174 > Project: Flink > Issue Type: Improvement > Components: Metrics >Affects Versions: 1.6.2, 1.7.0 >Reporter: Fan weiwen >Assignee: TisonKun >Priority: Minor > Labels: pull-request-available > Attachments: image-2018-12-17-17-17-05-965.png > > > use flink metrics and Prometheus > my job name is chinese > but org.apache.flink.metrics.prometheus.AbstractPrometheusReporter > replaceInvalidChars only support [a-zA-Z0-9:_] > so my job name is replaceAll > > i think labels key is [a-zA-Z0-9:_] ok > but labels value can support chinese? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#issuecomment-448885155 @azagrebin @zentol thank you for the review, I've just updated the code, leave the logic of `getNumberOfRestoringThreads ` unchanged, in sync with the logic for `enableIncrementalCheckpointing` for now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dianfu commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
dianfu commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243160395 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: What about only adding UserDefinedFunctionUtilTest#testRemoveStateViewFieldsFromAccTypeInfo in this PR? Just like you said, the tests of other methods of UserDefinedFunctionUtil can be done in another JIRA if needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r243132804 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamodbStreamsProxy.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.StreamStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties; + +/** + * DynamoDB streams proxy: interface interacting with the DynamoDB streams. + */ +public class DynamodbStreamsProxy extends KinesisProxy { + private static final Logger LOG = LoggerFactory.getLogger(DynamodbStreamsProxy.class); + + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector"; + + // Backoff millis for the describe stream operation. + private final long describeStreamBaseBackoffMillis; + // Maximum backoff millis for the describe stream operation. + private final long describeStreamMaxBackoffMillis; + // Exponential backoff power constant for the describe stream operation. + private final double describeStreamExpConstant; + + protected DynamodbStreamsProxy(Properties configProps) { + super(configProps); + + // parse properties + describeStreamBaseBackoffMillis = Long.valueOf( + configProps.getProperty(DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE, + Long.toString(DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE))); + describeStreamMaxBackoffMillis = Long.valueOf( +
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243154482 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: hmm. I think that's a good idea but should probably be addressed in another JIRA. I actually have [PR:6472](https://github.com/apache/flink/pull/6472) still pending which I am also very fond of the idea to add test for the Utility class. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-11077) Make subtask aware of the timeout of checkpoint and abort the current ongoing asynccheckpoint
[ https://issues.apache.org/jira/browse/FLINK-11077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] aitozi closed FLINK-11077. -- Resolution: Won't Fix > Make subtask aware of the timeout of checkpoint and abort the current ongoing > asynccheckpoint > - > > Key: FLINK-11077 > URL: https://issues.apache.org/jira/browse/FLINK-11077 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Affects Versions: 1.8.0 >Reporter: aitozi >Assignee: aitozi >Priority: Major > > Now checkpoint coordinator will cancel the checkpoint when checkpoint > timeout. But the subtask will continue to execute the async part / sync part > of the timeout checkpoint. When checkpoint state is large, it is a waste of > bandwidth. So I think the AsyncCheckpointRunnable should be aware of the > timeout checkpoint and interrupt them. What do you think [~StephanEwen] > [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dianfu commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
dianfu commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243143806 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: The tests can still pass without NullSerializer is because NullSerializer is an optimization. It doesn't affect the correctness. So an IT case can not cover this kind of tests. May be we should add a unit test for UserDefinedFunctionUtil#removeStateViewFieldsFromAccTypeInfo. But I found that currently there is no unit test for UserDefinedFunctionUtil. What do you think about adding a unit test class for UserDefinedFunctionUtil? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 edited a comment on issue #6236: [FLINK-9699] [table] Add api to replace registered table
hequn8128 edited a comment on issue #6236: [FLINK-9699] [table] Add api to replace registered table URL: https://github.com/apache/flink/pull/6236#issuecomment-448821407 Hi @bowenli86 , great to have your comments from the perspective of Catalog. I had taken a look at the Catalog API. It seems the internal replace flag may fit well with the API of Catalog. The flag can be replaced by `dropTable(ignoreIfNotExists=true) + createTable` once those temp tables registered via TableEnvironment have been moved into an in-memory catalog. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] Clarkkkkk commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation
Clark commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation URL: https://github.com/apache/flink/pull/7258#issuecomment-448830390 cc @dawidwys This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r243132804 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamodbStreamsProxy.java ## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kinesis.proxy; + +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.model.DescribeStreamRequest; +import com.amazonaws.services.kinesis.model.DescribeStreamResult; +import com.amazonaws.services.kinesis.model.LimitExceededException; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.Shard; +import com.amazonaws.services.kinesis.model.StreamStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT; +import static org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider; +import static org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties; + +/** + * DynamoDB streams proxy: interface interacting with the DynamoDB streams. + */ +public class DynamodbStreamsProxy extends KinesisProxy { + private static final Logger LOG = LoggerFactory.getLogger(DynamodbStreamsProxy.class); + + /** Used for formatting Flink-specific user agent string when creating Kinesis client. */ + private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) DynamoDB Streams Connector"; + + // Backoff millis for the describe stream operation. + private final long describeStreamBaseBackoffMillis; + // Maximum backoff millis for the describe stream operation. + private final long describeStreamMaxBackoffMillis; + // Exponential backoff power constant for the describe stream operation. + private final double describeStreamExpConstant; + + protected DynamodbStreamsProxy(Properties configProps) { + super(configProps); + + // parse properties + describeStreamBaseBackoffMillis = Long.valueOf( + configProps.getProperty(DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE, + Long.toString(DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE))); + describeStreamMaxBackoffMillis = Long.valueOf( +
[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink URL: https://github.com/apache/flink/pull/6968#discussion_r243132531 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java ## @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() { /** The interval after which to consider a shard idle for purposes of watermark generation. */ public static final String SHARD_IDLE_INTERVAL_MILLIS = "flink.shard.idle.interval"; + /** +* The base backoff time between each describeStream attempt. +* Different tag name to distinguish from "flink.stream.describe.backoff.base" +* since the latter is deprecated. +*/ + public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE = Review comment: OK. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table
hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table URL: https://github.com/apache/flink/pull/6236#issuecomment-448821407 Hi @bowenli86 , thanks for your advice. The internal replace flag may fit well with the API of Catalog. The flag can be replaced by `dropTable(ignoreIfNotExists=true) + createTable` once those temp tables registered via TableEnvironment have been moved into an in-memory catalog. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725476#comment-16725476 ] lining commented on FLINK-11162: @Chesnay Schepler, thanks your reply. Maybe we need discuss it, as job is becoming more complex, because chain operator, it’s difficult for user to manage the job. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] aljoscha commented on a change in pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
aljoscha commented on a change in pull request #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#discussion_r243039964 ## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java ## @@ -121,24 +127,14 @@ public boolean supportsResume() { return true; } - @VisibleForTesting - static org.apache.hadoop.fs.Path generateStagingTempFilePath( - org.apache.hadoop.fs.FileSystem fs, - org.apache.hadoop.fs.Path targetFile) throws IOException { - + private static org.apache.hadoop.fs.Path generateStagingTempFilePath(org.apache.hadoop.fs.Path targetFile) { checkArgument(targetFile.isAbsolute(), "targetFile must be absolute"); final org.apache.hadoop.fs.Path parent = targetFile.getParent(); final String name = targetFile.getName(); checkArgument(parent != null, "targetFile must not be the root directory"); - while (true) { - org.apache.hadoop.fs.Path candidate = new org.apache.hadoop.fs.Path( - parent, "." + name + ".inprogress." + UUID.randomUUID().toString()); - if (!fs.exists(candidate)) { - return candidate; - } - } + return new org.apache.hadoop.fs.Path(parent, "." + name + ".inprogress"); Review comment: We should definitely wait for @StephanEwen to comment on this (as you already discussed in the comments). Since he's the original author and there might be a good reason behind this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] joerg84 commented on issue #7338: Fixed comment about scala versions.
joerg84 commented on issue #7338: Fixed comment about scala versions. URL: https://github.com/apache/flink/pull/7338#issuecomment-448706656 > This is incorrect as Flink 1.7 also builds with scala 2.12. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bowenli86 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table
bowenli86 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table URL: https://github.com/apache/flink/pull/6236#issuecomment-448693612 I got your points. Sounds good. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243014619 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala ## @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils { ) } +def decorateDataViewTypeInfo( +fieldTypeInfo: TypeInformation[_], +fieldInstance: AnyRef, +field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = fieldTypeInfo match { + case ct: CompositeType[_] if includesDataView(ct) => +throw new TableException( + "MapView and ListView only supported at first level of accumulators of Pojo, Tuple " + +"and Case Class type.") + case map: MapViewTypeInfo[_, _] => +val mapView = fieldInstance.asInstanceOf[MapView[_, _]] +val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null && + mapView.valueTypeInfo != null) { + new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo) +} else { + map +} + +if (isStateBackedDataViews) { + newTypeInfo.nullSerializer = true + + // create map view specs with unique id (used as state name) + val fieldName = field.getName + var spec = MapViewSpec( +"agg" + index + "$" + fieldName, +field, +newTypeInfo) + + (newTypeInfo, Some(spec)) +} else { + (newTypeInfo, None) +} + + case list: ListViewTypeInfo[_] => Review comment: Sorry I was not making it clear. I meant maybe we can at a `Tuple` test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction URL: https://github.com/apache/flink/pull/7237#discussion_r243015377 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.dataview + +import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton +import org.apache.flink.core.memory.{DataInputView, DataOutputView} + +class NullSerializer extends TypeSerializerSingleton[Any] { Review comment: Yeah. I think I also trace the same code path for Pojo. However, removing the `NullSerializer` still lets the newly modified `testGroupAggregateWithStateBackend` passes. this is worrying because essentially there's no test safeguard for this piece of code being removed in the future. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XuQianJin-Stars commented on issue #7318: [FLINK-11099][Table API] Migrate flink-table runtime CRow Types classes
XuQianJin-Stars commented on issue #7318: [FLINK-11099][Table API] Migrate flink-table runtime CRow Types classes URL: https://github.com/apache/flink/pull/7318#issuecomment-448658898 @twalthr I update the branch or do a force push. thanks qianjin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] aljoscha commented on a change in pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
aljoscha commented on a change in pull request #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#discussion_r242982233 ## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java ## @@ -60,6 +60,12 @@ public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) { @Override public RecoverableFsDataOutputStream open(Path filePath) throws IOException { final org.apache.hadoop.fs.Path targetFile = HadoopFileSystem.toHadoopPath(filePath); + + // the finalized part must not exist already + if (fs.exists(targetFile)) { Review comment: Maybe put a more elaborate exception message that indicates that this is a bug. Because I think this shouldn't happen or you're in some deeper problem This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u removed a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u removed a comment on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448653987 Yes, this is why I said "if the FS allows it". Thanks for the review @igalshilman ! I will merge... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u commented on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448653987 Yes, this is why I said "if the FS allows it". Thanks for the review @igalshilman ! I will merge... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u commented on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448653989 Yes, this is why I said "if the FS allows it". Thanks for the review @igalshilman ! I will merge... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11143) AskTimeoutException is thrown during job submission and completion
[ https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725145#comment-16725145 ] Alex Vinnik commented on FLINK-11143: - [~QiLuo] we tried it, but it didn't solve the problem. Thanks for suggestion. > AskTimeoutException is thrown during job submission and completion > -- > > Key: FLINK-11143 > URL: https://issues.apache.org/jira/browse/FLINK-11143 > Project: Flink > Issue Type: Bug >Affects Versions: 1.6.2 >Reporter: Alex Vinnik >Priority: Major > > For more details please see the thread > [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E] > On submission > 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: > Unhandled exception. > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. > Sender[null] sent message of type > "org.apache.flink.runtime.rpc.messages.LocalFencedMessage". > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at > scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at > scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748) > > On completion > > {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\". > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) > at > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772) > at akka.dispatch.OnComplete.internal(Future.scala:258) > at akka.dispatch.OnComplete.internal(Future.scala:256) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186) > at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > at > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83) > at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603) > at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126) > at > scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601) > at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109) > at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599) > at > akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284) > at > akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236) > at java.lang.Thread.run(Thread.java:748)\nCaused by: > akka.pattern.AskTimeoutException: Ask timed out on > [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. > Sender[null] sent message of type > \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\". > at > akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\t... > 9 more\n\nEnd of exception on server side>"]} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue
[ https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725130#comment-16725130 ] eugen yushin commented on FLINK-11201: -- at the same time, looks like to sbt issue mentioned in: Transitive dependencies with classifier "test" are not include in the classpath #2964 https://github.com/sbt/sbt/issues/2964 Nevertheless, I think it makes sense to add a note into docs to point out this explicitly. So nobody will spend extra time seeking for this behavior explanation in future. > flink-test-utils dependency issue > - > > Key: FLINK-11201 > URL: https://issues.apache.org/jira/browse/FLINK-11201 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: eugen yushin >Priority: Major > > Starting with Flink 1.7, there's lack of > `runtime.testutils.MiniClusterResource` class in `flink-test-utils` > distribution. > Steps to reproduce (Scala code) > build.sbt > {code} > name := "flink-17-test-issue" > organization := "x.y.z" > scalaVersion := "2.11.12" > val flinkVersion = "1.7.0" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, > "org.scalatest" %% "scalatest" % "3.0.5" % Test, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test > // ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier > Artifact.TestsClassifier > ) > {code} > test class: > {code} > class SimpleTest extends AbstractTestBase with FlatSpecLike { > implicit val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > env.setRestartStrategy(RestartStrategies.noRestart()) > "SimpleTest" should "work" in { > val inputDs = env.fromElements(1,2,3) > inputDs.print() > env.execute() > } > } > {code} > Results in: > {code} > A needed class was not found. This could be due to an error in your runpath. > Missing class: org/apache/flink/runtime/testutils/MiniClusterResource > java.lang.NoClassDefFoundError: > org/apache/flink/runtime/testutils/MiniClusterResource > ... > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.runtime.testutils.MiniClusterResource > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 31 more > {code} > This can be fixed by adding flink-runtime distribution with test classifier > into dependencies list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak
[ https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725121#comment-16725121 ] Nawaid Shamim edited comment on FLINK-9080 at 12/19/18 3:49 PM: *Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader will load all the required classes from classpath. was (Author: nawaidshamim): *Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader will load all the required classes from classpath. !Screenshot 2018-12-18 at 15.47.55.png! > Flink Scheduler goes OOM, suspecting a memory leak > -- > > Key: FLINK-9080 > URL: https://issues.apache.org/jira/browse/FLINK-9080 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Rohit Singh >Assignee: Stefan Richter >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level > packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png > > > Running FLink version 1.4.0. on mesos,scheduler running along with job > manager in single container, whereas task managers running in seperate > containers. > Couple of jobs were running continously, Flink scheduler was working > properlyalong with task managers. Due to some change in data, one of the jobs > started failing continuously. In the meantime,there was a surge in flink > scheduler memory usually eventually died out off OOM > > Memory dump analysis was done, > Following were findings !Top Level packages.JPG!!Top level classes.JPG! > * Majority of top loaded packages retaining heap indicated towards > Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top > level package image) > * Top level classes were of Flinkuserclassloader, (Top Level class image) > * The number of classes loaded vs unloaded was quite less PFA,inspite of > adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , > PFAclassloaded vs unloaded graph, scheduler was restarted 3 times > * There were custom classes as well which were duplicated during subsequent > class uploads > PFA all the images of heap dump. Can you suggest some pointers on as to how > to overcome this issue. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak
[ https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725121#comment-16725121 ] Nawaid Shamim commented on FLINK-9080: -- *Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader will load all the required classes from classpath. !Screenshot 2018-12-18 at 15.47.55.png! > Flink Scheduler goes OOM, suspecting a memory leak > -- > > Key: FLINK-9080 > URL: https://issues.apache.org/jira/browse/FLINK-9080 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Rohit Singh >Assignee: Stefan Richter >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level > packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png > > > Running FLink version 1.4.0. on mesos,scheduler running along with job > manager in single container, whereas task managers running in seperate > containers. > Couple of jobs were running continously, Flink scheduler was working > properlyalong with task managers. Due to some change in data, one of the jobs > started failing continuously. In the meantime,there was a surge in flink > scheduler memory usually eventually died out off OOM > > Memory dump analysis was done, > Following were findings !Top Level packages.JPG!!Top level classes.JPG! > * Majority of top loaded packages retaining heap indicated towards > Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top > level package image) > * Top level classes were of Flinkuserclassloader, (Top Level class image) > * The number of classes loaded vs unloaded was quite less PFA,inspite of > adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , > PFAclassloaded vs unloaded graph, scheduler was restarted 3 times > * There were custom classes as well which were duplicated during subsequent > class uploads > PFA all the images of heap dump. Can you suggest some pointers on as to how > to overcome this issue. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak
[ https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-11205: -- Attachment: Screenshot 2018-12-18 at 15.47.55.png > Task Manager Metaspace Memory Leak > --- > > Key: FLINK-11205 > URL: https://issues.apache.org/jira/browse/FLINK-11205 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nawaid Shamim >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png, Screenshot > 2018-12-18 at 15.47.55.png > > > Job Restarts causes task manager to dynamically load duplicate classes. > Metaspace is unbounded and grows with every restart. YARN aggressively kill > such containers but this affect is immediately seems on different task > manager which results in death spiral. > Task Manager uses dynamic loader as described in > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] > {quote} > *YARN* > YARN classloading differs between single job deployments and sessions: > * When submitting a Flink job/application directly to YARN (via {{bin/flink > run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are > started for that job. Those JVMs have both Flink framework classes and user > code classes in the Java classpath. That means that there is _no dynamic > classloading_ involved in that case. > * When starting a YARN session, the JobManagers and TaskManagers are started > with the Flink framework classes in the classpath. The classes from all jobs > that are submitted against the session are loaded dynamically. > {quote} > The above is not entirely true specially when you set {{-yD > classloader.resolve-order=parent-first}} . We also above observed the above > behaviour when submitting a Flink job/application directly to YARN (via > {{bin/flink run -m yarn-cluster ...}}). > !Screenshot 2018-12-18 at 12.14.11.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11205) Task Manager Metaspace Memory Leak
[ https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725120#comment-16725120 ] Nawaid Shamim commented on FLINK-11205: --- *Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader will load all the required classes from classpath. !Screenshot 2018-12-18 at 15.47.55.png! > Task Manager Metaspace Memory Leak > --- > > Key: FLINK-11205 > URL: https://issues.apache.org/jira/browse/FLINK-11205 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nawaid Shamim >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png, Screenshot > 2018-12-18 at 15.47.55.png > > > Job Restarts causes task manager to dynamically load duplicate classes. > Metaspace is unbounded and grows with every restart. YARN aggressively kill > such containers but this affect is immediately seems on different task > manager which results in death spiral. > Task Manager uses dynamic loader as described in > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] > {quote} > *YARN* > YARN classloading differs between single job deployments and sessions: > * When submitting a Flink job/application directly to YARN (via {{bin/flink > run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are > started for that job. Those JVMs have both Flink framework classes and user > code classes in the Java classpath. That means that there is _no dynamic > classloading_ involved in that case. > * When starting a YARN session, the JobManagers and TaskManagers are started > with the Flink framework classes in the classpath. The classes from all jobs > that are submitted against the session are loaded dynamically. > {quote} > The above is not entirely true specially when you set {{-yD > classloader.resolve-order=parent-first}} . We also above observed the above > behaviour when submitting a Flink job/application directly to YARN (via > {{bin/flink run -m yarn-cluster ...}}). > !Screenshot 2018-12-18 at 12.14.11.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak
[ https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-11205: -- Attachment: (was: Screenshot 2018-12-18 at 12.12.15.png) > Task Manager Metaspace Memory Leak > --- > > Key: FLINK-11205 > URL: https://issues.apache.org/jira/browse/FLINK-11205 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nawaid Shamim >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png > > > Job Restarts causes task manager to dynamically load duplicate classes. > Metaspace is unbounded and grows with every restart. YARN aggressively kill > such containers but this affect is immediately seems on different task > manager which results in death spiral. > Task Manager uses dynamic loader as described in > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] > {quote} > *YARN* > YARN classloading differs between single job deployments and sessions: > * When submitting a Flink job/application directly to YARN (via {{bin/flink > run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are > started for that job. Those JVMs have both Flink framework classes and user > code classes in the Java classpath. That means that there is _no dynamic > classloading_ involved in that case. > * When starting a YARN session, the JobManagers and TaskManagers are started > with the Flink framework classes in the classpath. The classes from all jobs > that are submitted against the session are loaded dynamically. > {quote} > The above is not entirely true specially when you set {{-yD > classloader.resolve-order=parent-first}} . We also above observed the above > behaviour when submitting a Flink job/application directly to YARN (via > {{bin/flink run -m yarn-cluster ...}}). > !Screenshot 2018-12-18 at 12.14.11.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak
[ https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-11205: -- Attachment: Screenshot 2018-12-18 at 12.12.15.png > Task Manager Metaspace Memory Leak > --- > > Key: FLINK-11205 > URL: https://issues.apache.org/jira/browse/FLINK-11205 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nawaid Shamim >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.12.15.png, Screenshot > 2018-12-18 at 12.14.11.png > > > Job Restarts causes task manager to dynamically load duplicate classes. > Metaspace is unbounded and grows with every restart. YARN aggressively kill > such containers but this affect is immediately seems on different task > manager which results in death spiral. > Task Manager uses dynamic loader as described in > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] > {quote} > *YARN* > YARN classloading differs between single job deployments and sessions: > * When submitting a Flink job/application directly to YARN (via {{bin/flink > run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are > started for that job. Those JVMs have both Flink framework classes and user > code classes in the Java classpath. That means that there is _no dynamic > classloading_ involved in that case. > * When starting a YARN session, the JobManagers and TaskManagers are started > with the Flink framework classes in the classpath. The classes from all jobs > that are submitted against the session are loaded dynamically. > {quote} > The above is not entirely true specially when you set {{-yD > classloader.resolve-order=parent-first}} . We also above observed the above > behaviour when submitting a Flink job/application directly to YARN (via > {{bin/flink run -m yarn-cluster ...}}). > !Screenshot 2018-12-18 at 12.14.11.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak
[ https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-9080: - Attachment: Screenshot 2018-12-18 at 12.14.11.png > Flink Scheduler goes OOM, suspecting a memory leak > -- > > Key: FLINK-9080 > URL: https://issues.apache.org/jira/browse/FLINK-9080 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Rohit Singh >Assignee: Stefan Richter >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level > packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png > > > Running FLink version 1.4.0. on mesos,scheduler running along with job > manager in single container, whereas task managers running in seperate > containers. > Couple of jobs were running continously, Flink scheduler was working > properlyalong with task managers. Due to some change in data, one of the jobs > started failing continuously. In the meantime,there was a surge in flink > scheduler memory usually eventually died out off OOM > > Memory dump analysis was done, > Following were findings !Top Level packages.JPG!!Top level classes.JPG! > * Majority of top loaded packages retaining heap indicated towards > Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top > level package image) > * Top level classes were of Flinkuserclassloader, (Top Level class image) > * The number of classes loaded vs unloaded was quite less PFA,inspite of > adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , > PFAclassloaded vs unloaded graph, scheduler was restarted 3 times > * There were custom classes as well which were duplicated during subsequent > class uploads > PFA all the images of heap dump. Can you suggest some pointers on as to how > to overcome this issue. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak
[ https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725118#comment-16725118 ] Nawaid Shamim commented on FLINK-9080: -- I noticed similar issue on job restarts - https://issues.apache.org/jira/browse/FLINK-11205 !Screenshot 2018-12-18 at 12.14.11.png! > Flink Scheduler goes OOM, suspecting a memory leak > -- > > Key: FLINK-9080 > URL: https://issues.apache.org/jira/browse/FLINK-9080 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.4.0 >Reporter: Rohit Singh >Assignee: Stefan Richter >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level > packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png > > > Running FLink version 1.4.0. on mesos,scheduler running along with job > manager in single container, whereas task managers running in seperate > containers. > Couple of jobs were running continously, Flink scheduler was working > properlyalong with task managers. Due to some change in data, one of the jobs > started failing continuously. In the meantime,there was a surge in flink > scheduler memory usually eventually died out off OOM > > Memory dump analysis was done, > Following were findings !Top Level packages.JPG!!Top level classes.JPG! > * Majority of top loaded packages retaining heap indicated towards > Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top > level package image) > * Top level classes were of Flinkuserclassloader, (Top Level class image) > * The number of classes loaded vs unloaded was quite less PFA,inspite of > adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , > PFAclassloaded vs unloaded graph, scheduler was restarted 3 times > * There were custom classes as well which were duplicated during subsequent > class uploads > PFA all the images of heap dump. Can you suggest some pointers on as to how > to overcome this issue. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11202) Split log file per job
[ https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725111#comment-16725111 ] vinoyang commented on FLINK-11202: -- OK, Got it. Before I start this issue, I will provide a design document to discuss. > Split log file per job > -- > > Key: FLINK-11202 > URL: https://issues.apache.org/jira/browse/FLINK-11202 > Project: Flink > Issue Type: Improvement >Reporter: chauncy >Assignee: vinoyang >Priority: Major > > Debugging issues is difficult since Task-/JobManagers create a single log > file in standalone cluster environments. I think having a log file for each > job would be preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10317) Configure Metaspace size by default
[ https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725116#comment-16725116 ] Nawaid Shamim commented on FLINK-10317: --- I noticed similar issue on job restart - https://issues.apache.org/jira/browse/FLINK-11205 !Screenshot 2018-12-18 at 12.14.11.png! > Configure Metaspace size by default > --- > > Key: FLINK-10317 > URL: https://issues.apache.org/jira/browse/FLINK-10317 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Attachments: Screenshot 2018-12-18 at 12.14.11.png > > > We should set the size of the JVM Metaspace to a sane default, like > {{-XX:MaxMetaspaceSize=256m}}. > If not set, the JVM offheap memory will grow indefinitely with repeated > classloading and Jitting, eventually exceeding allowed memory on docker/yarn > or similar setups. > It is hard to come up with a good default, however, I believe the error > messages one gets when metaspace is too small are easy to understand (and > easy to take action), while it is very hard to figure out why the memory > footprint keeps growing steadily and infinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10317) Configure Metaspace size by default
[ https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-10317: -- Attachment: Screenshot 2018-12-18 at 12.14.11.png > Configure Metaspace size by default > --- > > Key: FLINK-10317 > URL: https://issues.apache.org/jira/browse/FLINK-10317 > Project: Flink > Issue Type: Bug > Components: Startup Shell Scripts >Affects Versions: 1.5.3, 1.6.0, 1.7.0 >Reporter: Stephan Ewen >Assignee: vinoyang >Priority: Major > Fix For: 1.6.4, 1.7.2, 1.8.0 > > Attachments: Screenshot 2018-12-18 at 12.14.11.png > > > We should set the size of the JVM Metaspace to a sane default, like > {{-XX:MaxMetaspaceSize=256m}}. > If not set, the JVM offheap memory will grow indefinitely with repeated > classloading and Jitting, eventually exceeding allowed memory on docker/yarn > or similar setups. > It is hard to come up with a good default, however, I believe the error > messages one gets when metaspace is too small are easy to understand (and > easy to take action), while it is very hard to figure out why the memory > footprint keeps growing steadily and infinitely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11202) Split log file per job
[ https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725108#comment-16725108 ] Chesnay Schepler commented on FLINK-11202: -- This was brought up before and I can see the point, but we need a design on how to implement this that doesn't require a full rewrite of all of our logging code. > Split log file per job > -- > > Key: FLINK-11202 > URL: https://issues.apache.org/jira/browse/FLINK-11202 > Project: Flink > Issue Type: Improvement >Reporter: chauncy >Assignee: vinoyang >Priority: Major > > Debugging issues is difficult since Task-/JobManagers create a single log > file in standalone cluster environments. I think having a log file for each > job would be preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue
[ https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725115#comment-16725115 ] eugen yushin commented on FLINK-11201: -- It's a behavior change which is not reflected in docs. If code from the description works fine on your env, I have no questions. But it doesn't on 2 dev's machines I have. > flink-test-utils dependency issue > - > > Key: FLINK-11201 > URL: https://issues.apache.org/jira/browse/FLINK-11201 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: eugen yushin >Priority: Major > > Starting with Flink 1.7, there's lack of > `runtime.testutils.MiniClusterResource` class in `flink-test-utils` > distribution. > Steps to reproduce (Scala code) > build.sbt > {code} > name := "flink-17-test-issue" > organization := "x.y.z" > scalaVersion := "2.11.12" > val flinkVersion = "1.7.0" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, > "org.scalatest" %% "scalatest" % "3.0.5" % Test, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test > // ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier > Artifact.TestsClassifier > ) > {code} > test class: > {code} > class SimpleTest extends AbstractTestBase with FlatSpecLike { > implicit val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > env.setRestartStrategy(RestartStrategies.noRestart()) > "SimpleTest" should "work" in { > val inputDs = env.fromElements(1,2,3) > inputDs.print() > env.execute() > } > } > {code} > Results in: > {code} > A needed class was not found. This could be due to an error in your runpath. > Missing class: org/apache/flink/runtime/testutils/MiniClusterResource > java.lang.NoClassDefFoundError: > org/apache/flink/runtime/testutils/MiniClusterResource > ... > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.runtime.testutils.MiniClusterResource > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 31 more > {code} > This can be fixed by adding flink-runtime distribution with test classifier > into dependencies list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak
[ https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-11205: -- Description: Job Restarts causes task manager to dynamically load duplicate classes. Metaspace is unbounded and grows with every restart. YARN aggressively kill such containers but this affect is immediately seems on different task manager which results in death spiral. Task Manager uses dynamic loader as described in [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] {quote} *YARN* YARN classloading differs between single job deployments and sessions: * When submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is _no dynamic classloading_ involved in that case. * When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the classpath. The classes from all jobs that are submitted against the session are loaded dynamically. {quote} The above is not entirely true specially when you set {{-yD classloader.resolve-order=parent-first}} . We also above observed the above behaviour when submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}). !Screenshot 2018-12-18 at 12.14.11.png! was: Job Restarts causes task manager to dynamically load duplicate classes. Metaspace is unbounded and grows with every restart. YARN aggressively kill such containers but this affect is immediately seems on different task manager which results in death spiral. !Screenshot 2018-12-18 at 12.14.11.png!thumbnail! Task Manager uses dynamic loader as described in [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] {quote} *YARN* YARN classloading differs between single job deployments and sessions: * When submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is _no dynamic classloading_ involved in that case. * When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the classpath. The classes from all jobs that are submitted against the session are loaded dynamically. {quote} The above is not entirely true specially when you set {{-yD classloader.resolve-order=parent-first}} . We also above observed the above behaviour when submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}). > Task Manager Metaspace Memory Leak > --- > > Key: FLINK-11205 > URL: https://issues.apache.org/jira/browse/FLINK-11205 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nawaid Shamim >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png > > > Job Restarts causes task manager to dynamically load duplicate classes. > Metaspace is unbounded and grows with every restart. YARN aggressively kill > such containers but this affect is immediately seems on different task > manager which results in death spiral. > Task Manager uses dynamic loader as described in > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] > {quote} > *YARN* > YARN classloading differs between single job deployments and sessions: > * When submitting a Flink job/application directly to YARN (via {{bin/flink > run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are > started for that job. Those JVMs have both Flink framework classes and user > code classes in the Java classpath. That means that there is _no dynamic > classloading_ involved in that case. > * When starting a YARN session, the JobManagers and TaskManagers are started > with the Flink framework classes in the classpath. The classes from all jobs > that are submitted against the session are loaded dynamically. > {quote} > The above is not entirely true specially when you set {{-yD > classloader.resolve-order=parent-first}} . We also above observed the above > behaviour when submitting a Flink job/application directly to YARN (via > {{bin/flink run -m yarn-cluster ...}}). > !Screenshot 2018-12-18 at 12.14.11.png! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak
[ https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-11205: -- Description: Job Restarts causes task manager to dynamically load duplicate classes. Metaspace is unbounded and grows with every restart. YARN aggressively kill such containers but this affect is immediately seems on different task manager which results in death spiral. !Screenshot 2018-12-18 at 12.14.11.png!thumbnail! Task Manager uses dynamic loader as described in [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] {quote} *YARN* YARN classloading differs between single job deployments and sessions: * When submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is _no dynamic classloading_ involved in that case. * When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the classpath. The classes from all jobs that are submitted against the session are loaded dynamically. {quote} The above is not entirely true specially when you set {{-yD classloader.resolve-order=parent-first}} . We also above observed the above behaviour when submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}). was: Job Restarts causes task manager to dynamically load duplicate classes. Metaspace is unbounded and grows with every restart. YARN aggressively kill such containers but this affect is immediately seems on different task manager which results in death spiral. !Screenshot 2018-12-18 at 12.14.11.png! Task Manager uses dynamic loader as described in [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] {quote} *YARN* YARN classloading differs between single job deployments and sessions: * When submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is _no dynamic classloading_ involved in that case. * When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the classpath. The classes from all jobs that are submitted against the session are loaded dynamically. {quote} The above is not entirely true specially when you set {{-yD classloader.resolve-order=parent-first}} . We also above observed the above behaviour when submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}). > Task Manager Metaspace Memory Leak > --- > > Key: FLINK-11205 > URL: https://issues.apache.org/jira/browse/FLINK-11205 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nawaid Shamim >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png > > > Job Restarts causes task manager to dynamically load duplicate classes. > Metaspace is unbounded and grows with every restart. YARN aggressively kill > such containers but this affect is immediately seems on different task > manager which results in death spiral. > !Screenshot 2018-12-18 at 12.14.11.png!thumbnail! > Task Manager uses dynamic loader as described in > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] > {quote} > *YARN* > YARN classloading differs between single job deployments and sessions: > * When submitting a Flink job/application directly to YARN (via {{bin/flink > run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are > started for that job. Those JVMs have both Flink framework classes and user > code classes in the Java classpath. That means that there is _no dynamic > classloading_ involved in that case. > * When starting a YARN session, the JobManagers and TaskManagers are started > with the Flink framework classes in the classpath. The classes from all jobs > that are submitted against the session are loaded dynamically. > {quote} > The above is not entirely true specially when you set {{-yD > classloader.resolve-order=parent-first}} . We also above observed the above > behaviour when submitting a Flink job/application directly to YARN (via > {{bin/flink run -m yarn-cluster ...}}). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak
[ https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nawaid Shamim updated FLINK-11205: -- Description: Job Restarts causes task manager to dynamically load duplicate classes. Metaspace is unbounded and grows with every restart. YARN aggressively kill such containers but this affect is immediately seems on different task manager which results in death spiral. !Screenshot 2018-12-18 at 12.14.11.png! Task Manager uses dynamic loader as described in [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] {quote} *YARN* YARN classloading differs between single job deployments and sessions: * When submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is _no dynamic classloading_ involved in that case. * When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the classpath. The classes from all jobs that are submitted against the session are loaded dynamically. {quote} The above is not entirely true specially when you set {{-yD classloader.resolve-order=parent-first}} . We also above observed the above behaviour when submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}). was: Job Restarts causes task manager to dynamically load duplicate classes. Metaspace is unbounded and grows with every restart. YARN aggressively kill such containers but this affect is immediately seems on different task manager which results in death spiral. !Screenshot 2018-12-18 at 12.14.11.png!width=480! Task Manager uses dynamic loader as described in [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] {quote} *YARN* YARN classloading differs between single job deployments and sessions: * When submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is _no dynamic classloading_ involved in that case. * When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the classpath. The classes from all jobs that are submitted against the session are loaded dynamically. {quote} The above is not entirely true specially when you set {{-yD classloader.resolve-order=parent-first}} . We also above observed the above behaviour when submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}). > Task Manager Metaspace Memory Leak > --- > > Key: FLINK-11205 > URL: https://issues.apache.org/jira/browse/FLINK-11205 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.5, 1.6.2, 1.7.0 >Reporter: Nawaid Shamim >Priority: Major > Attachments: Screenshot 2018-12-18 at 12.14.11.png > > > Job Restarts causes task manager to dynamically load duplicate classes. > Metaspace is unbounded and grows with every restart. YARN aggressively kill > such containers but this affect is immediately seems on different task > manager which results in death spiral. > !Screenshot 2018-12-18 at 12.14.11.png! > Task Manager uses dynamic loader as described in > [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] > {quote} > *YARN* > YARN classloading differs between single job deployments and sessions: > * When submitting a Flink job/application directly to YARN (via {{bin/flink > run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are > started for that job. Those JVMs have both Flink framework classes and user > code classes in the Java classpath. That means that there is _no dynamic > classloading_ involved in that case. > * When starting a YARN session, the JobManagers and TaskManagers are started > with the Flink framework classes in the classpath. The classes from all jobs > that are submitted against the session are loaded dynamically. > {quote} > The above is not entirely true specially when you set {{-yD > classloader.resolve-order=parent-first}} . We also above observed the above > behaviour when submitting a Flink job/application directly to YARN (via > {{bin/flink run -m yarn-cluster ...}}). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11205) Task Manager Metaspace Memory Leak
Nawaid Shamim created FLINK-11205: - Summary: Task Manager Metaspace Memory Leak Key: FLINK-11205 URL: https://issues.apache.org/jira/browse/FLINK-11205 Project: Flink Issue Type: Bug Affects Versions: 1.7.0, 1.6.2, 1.5.5 Reporter: Nawaid Shamim Attachments: Screenshot 2018-12-18 at 12.14.11.png Job Restarts causes task manager to dynamically load duplicate classes. Metaspace is unbounded and grows with every restart. YARN aggressively kill such containers but this affect is immediately seems on different task manager which results in death spiral. !Screenshot 2018-12-18 at 12.14.11.png!width=480! Task Manager uses dynamic loader as described in [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html] {quote} *YARN* YARN classloading differs between single job deployments and sessions: * When submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started for that job. Those JVMs have both Flink framework classes and user code classes in the Java classpath. That means that there is _no dynamic classloading_ involved in that case. * When starting a YARN session, the JobManagers and TaskManagers are started with the Flink framework classes in the classpath. The classes from all jobs that are submitted against the session are loaded dynamically. {quote} The above is not entirely true specially when you set {{-yD classloader.resolve-order=parent-first}} . We also above observed the above behaviour when submitting a Flink job/application directly to YARN (via {{bin/flink run -m yarn-cluster ...}}). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11202) Split log file per job
[ https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] vinoyang reassigned FLINK-11202: Assignee: vinoyang > Split log file per job > -- > > Key: FLINK-11202 > URL: https://issues.apache.org/jira/browse/FLINK-11202 > Project: Flink > Issue Type: Improvement >Reporter: chauncy >Assignee: vinoyang >Priority: Major > > Debugging issues is difficult since Task-/JobManagers create a single log > file in standalone cluster environments. I think having a log file for each > job would be preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11202) Split log file per job
[ https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725083#comment-16725083 ] vinoyang commented on FLINK-11202: -- [~Zentol] Do you think we should split log file per job for both job manager and task manager? > Split log file per job > -- > > Key: FLINK-11202 > URL: https://issues.apache.org/jira/browse/FLINK-11202 > Project: Flink > Issue Type: Improvement >Reporter: chauncy >Priority: Major > > Debugging issues is difficult since Task-/JobManagers create a single log > file in standalone cluster environments. I think having a log file for each > job would be preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol removed a comment on issue #7338: Fixed comment about scala versions.
zentol removed a comment on issue #7338: Fixed comment about scala versions. URL: https://github.com/apache/flink/pull/7338#issuecomment-448621648 This is incorrect as Flink 1.7 also builds with scala 2.12. The entire section nee This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #7338: Fixed comment about scala versions.
zentol commented on issue #7338: Fixed comment about scala versions. URL: https://github.com/apache/flink/pull/7338#issuecomment-448621648 This is incorrect as Flink 1.7 also builds with scala 2.12. The entire section nee This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #7338: Fixed comment about scala versions.
zentol commented on issue #7338: Fixed comment about scala versions. URL: https://github.com/apache/flink/pull/7338#issuecomment-448621675 This is incorrect as Flink 1.7 also builds with scala 2.12. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11202) Split log file per job
[ https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11202: - Description: Debugging issues is difficult since Task-/JobManagers create a single log file in standalone cluster environments. I think having a log file for each job would be preferable. (was: find bug is difficult due totask manager and job manager's log into one big log file with standalone cluster env , i think each job has a log file is profile ) > Split log file per job > -- > > Key: FLINK-11202 > URL: https://issues.apache.org/jira/browse/FLINK-11202 > Project: Flink > Issue Type: Improvement >Reporter: chauncy >Priority: Major > > Debugging issues is difficult since Task-/JobManagers create a single log > file in standalone cluster environments. I think having a log file for each > job would be preferable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11202) Split log file per job
[ https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11202: - Summary: Split log file per job (was: log split file, every job has one log file ) > Split log file per job > -- > > Key: FLINK-11202 > URL: https://issues.apache.org/jira/browse/FLINK-11202 > Project: Flink > Issue Type: Improvement >Reporter: chauncy >Priority: Major > > find bug is difficult due totask manager and job manager's log into one > big log file with standalone cluster env , i think each job has a log file > is profile -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725045#comment-16725045 ] Chesnay Schepler commented on FLINK-11162: -- No. As I said _before_ you opened your PR, I don't think we're willing to do these changes right now. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11200) Port DataView classes to flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-11200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725040#comment-16725040 ] Hequn Cheng commented on FLINK-11200: - [~twalthr] Make sense, I will try to follow your idea. > Port DataView classes to flink-table-common > --- > > Key: FLINK-11200 > URL: https://issues.apache.org/jira/browse/FLINK-11200 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Hequn Cheng >Priority: Major > > {{DataView}} are used within aggregate functions for more efficient state > management. Logically, they should have been ported in FLINK-10689. > This issue only includes {{org.apache.flink.table.api.dataview.*}} and > {{org.apache.flink.table.dataview.StateListView/StateMapView}}. The latter > one is shared between planning and runtime phase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #7318: [FLINK-11099][Table API] Migrate flink-table runtime CRow Types classes
twalthr commented on issue #7318: [FLINK-11099][Table API] Migrate flink-table runtime CRow Types classes URL: https://github.com/apache/flink/pull/7318#issuecomment-448610899 @XuQianJin-Stars no need for a new PR, just update the branch or do a force push. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-11200) Port DataView classes to flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-11200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725032#comment-16725032 ] Timo Walther edited comment on FLINK-11200 at 12/19/18 2:14 PM: [~hequn8128] You are right. I haven't seen the transitive class dependencies. Would be great if we can find a better solution with cleaner interfaces than just porting {{ListViewTypeInfo}}, {{ListViewSerializer}} into {{table-common}}. In my opinion, those classes don't fit 100% into the a module that is shared across a lot of Flink modules. But if you find no cleaner way while maintaining backwards compatibility, feel free to port them as well. was (Author: twalthr): You are right. I haven't seen the transitive class dependencies. Would be great if we can find a better solution with cleaner interfaces than just porting {{ListViewTypeInfo}}, {{ListViewSerializer}} into {{table-common}}. In my opinion, those classes don't fit 100% into the a module that is shared across a lot of Flink modules. But if you find no cleaner way while maintaining backwards compatibility, feel free to port them as well. > Port DataView classes to flink-table-common > --- > > Key: FLINK-11200 > URL: https://issues.apache.org/jira/browse/FLINK-11200 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Hequn Cheng >Priority: Major > > {{DataView}} are used within aggregate functions for more efficient state > management. Logically, they should have been ported in FLINK-10689. > This issue only includes {{org.apache.flink.table.api.dataview.*}} and > {{org.apache.flink.table.dataview.StateListView/StateMapView}}. The latter > one is shared between planning and runtime phase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11200) Port DataView classes to flink-table-common
[ https://issues.apache.org/jira/browse/FLINK-11200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725032#comment-16725032 ] Timo Walther commented on FLINK-11200: -- You are right. I haven't seen the transitive class dependencies. Would be great if we can find a better solution with cleaner interfaces than just porting {{ListViewTypeInfo}}, {{ListViewSerializer}} into {{table-common}}. In my opinion, those classes don't fit 100% into the a module that is shared across a lot of Flink modules. But if you find no cleaner way while maintaining backwards compatibility, feel free to port them as well. > Port DataView classes to flink-table-common > --- > > Key: FLINK-11200 > URL: https://issues.apache.org/jira/browse/FLINK-11200 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Timo Walther >Assignee: Hequn Cheng >Priority: Major > > {{DataView}} are used within aggregate functions for more efficient state > management. Logically, they should have been ported in FLINK-10689. > This issue only includes {{org.apache.flink.table.api.dataview.*}} and > {{org.apache.flink.table.dataview.StateListView/StateMapView}}. The latter > one is shared between planning and runtime phase. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] igalshilman commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
igalshilman commented on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448601803 Good point @kl0u, however in HDFS there would be a single writer at a time, protected by the writer lease. Thanks for addressing, the review comments, I think the tests can be made a bit more readable, but we may like to followup on this as part of a different issue. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11204) flink start-scala-shell.sh do not work in security mode with kerberos authentication.
[ https://issues.apache.org/jira/browse/FLINK-11204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724997#comment-16724997 ] Jeff Zhang commented on FLINK-11204: Do you run kinit before starting scala-shell ? > flink start-scala-shell.sh do not work in security mode with kerberos > authentication. > - > > Key: FLINK-11204 > URL: https://issues.apache.org/jira/browse/FLINK-11204 > Project: Flink > Issue Type: Bug > Components: Scala Shell >Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0 >Reporter: kelun wang >Priority: Minor > > Hello, > When using flink start-scala-shell.sh in a cluster with kerberos credential, > the script does not supports Kerberos authentication, errors like below will > occur: > 1) Fail to deploy Yarn cluster. > > {code:java} > start-scala-shell.sh yarn -n 3 > Exception in thread "main" java.lang.RuntimeException: Error deploying the > YARN cluster > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594) > at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81) > at > org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256) > at > org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165) > at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189) > at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188) > at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137) > at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala) > Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm > at > org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65) > at > org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318) > at > org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303) > at > org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391) > at > org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384) > at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351) > ... 8 more > {code} > > 2)Fail to fetch deployed a Flink cluster, when using the following command : > bin/start-scala-shell.sh yarn > > {code:java} > def fetchDeployedYarnClusterInfo( > configuration: Configuration, > configurationDirectory: String) = { > val args = ArrayBuffer[String]( > "-m", "yarn-cluster" > ) > {code} > when fething deployed yarn cluster, with param "-m yarn-cluster" it will > create new one, but has no "-n", still fail. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] joerg84 commented on issue #7338: Fixed comment about scala versions.
joerg84 commented on issue #7338: Fixed comment about scala versions. URL: https://github.com/apache/flink/pull/7338#issuecomment-448596345 @fhueske PTAL This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] joerg84 opened a new pull request #7338: Fixed comment about scala versions.
joerg84 opened a new pull request #7338: Fixed comment about scala versions. URL: https://github.com/apache/flink/pull/7338 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] jinglining commented on issue #7325: [FLINK-11162][rest] Provide a rest API to list all logical operators
jinglining commented on issue #7325: [FLINK-11162][rest] Provide a rest API to list all logical operators URL: https://github.com/apache/flink/pull/7325#issuecomment-448594791 @zentol can you review it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators
[ https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724995#comment-16724995 ] lining commented on FLINK-11162: [~Zentol] I have pushed the code, can you review it. > Provide a rest API to list all logical operators > > > Key: FLINK-11162 > URL: https://issues.apache.org/jira/browse/FLINK-11162 > Project: Flink > Issue Type: New Feature > Components: REST >Reporter: vinoyang >Assignee: lining >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > The scene of this issue: > We are using the indicator variable of the operator: , > . > We have customized the display of the indicator. Based on the query purpose, > we currently lack an interface to get all the logical operators of a job. The > current rest API only provides the chained node information. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11204) flink start-scala-shell.sh do not work in security mode with kerberos authentication.
[ https://issues.apache.org/jira/browse/FLINK-11204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] kelun wang updated FLINK-11204: --- Description: Hello, When using flink start-scala-shell.sh in a cluster with kerberos credential, the script does not supports Kerberos authentication, errors like below will occur: 1) Fail to deploy Yarn cluster. {code:java} start-scala-shell.sh yarn -n 3 Exception in thread "main" java.lang.RuntimeException: Error deploying the YARN cluster at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81) at org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256) at org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165) at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189) at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188) at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137) at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala) Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303) at org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391) at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351) ... 8 more {code} 2)Fail to fetch deployed a Flink cluster, when using the following command : bin/start-scala-shell.sh yarn {code:java} def fetchDeployedYarnClusterInfo( configuration: Configuration, configurationDirectory: String) = { val args = ArrayBuffer[String]( "-m", "yarn-cluster" ) {code} when fething deployed yarn cluster, with param "-m yarn-cluster" it will create new one, but has no "-n", still fail. was: Hello, When using flink start-scala-shell.sh in a cluster with kerberos credential, the script do not supports Kerberos authentication, errors like below will occur: 1) Fail to deploy Yarn cluster. {code:java} start-scala-shell.sh yarn -n 3 Exception in thread "main" java.lang.RuntimeException: Error deploying the YARN cluster at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81) at org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256) at org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165) at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189) at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188) at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137) at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala) Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303) at org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391) at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351) ... 8 more {code} 2)Fail to fetch deployed a Flink cluster, when using the following command : bin/start-scala-shell.sh yarn {code:java} def fetchDeployedYarnClusterInfo( configuration: Configuration, configurationDirectory: String) = { val args = ArrayBuffer[String]( "-m", "yarn-cluster" ) {code} when fething deployed yarn cluster, with param "-m yarn-cluster" it will create new one, but has no "-n", still fail. > flink start-scala-shell.sh do not work in security mode with kerberos > authentication. > - > > Key: FLINK-11204 > URL: https://issues.apache.org/jira/browse/FLINK-11204 > Project: Flink > Issue
[GitHub] zentol commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
zentol commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#issuecomment-448593297 Will address my last comment while merging. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-11204) flink start-scala-shell.sh do not work in security mode with kerberos authentication.
kelun wang created FLINK-11204: -- Summary: flink start-scala-shell.sh do not work in security mode with kerberos authentication. Key: FLINK-11204 URL: https://issues.apache.org/jira/browse/FLINK-11204 Project: Flink Issue Type: Bug Components: Scala Shell Affects Versions: 1.7.0, 1.6.2, 1.5.5, 1.4.2, 1.3.3 Reporter: kelun wang Hello, When using flink start-scala-shell.sh in a cluster with kerberos credential, the script do not supports Kerberos authentication, errors like below will occur: 1) Fail to deploy Yarn cluster. {code:java} start-scala-shell.sh yarn -n 3 Exception in thread "main" java.lang.RuntimeException: Error deploying the YARN cluster at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594) at org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81) at org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256) at org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165) at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189) at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188) at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137) at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala) Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm at org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303) at org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391) at org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384) at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351) ... 8 more {code} 2)Fail to fetch deployed a Flink cluster, when using the following command : bin/start-scala-shell.sh yarn {code:java} def fetchDeployedYarnClusterInfo( configuration: Configuration, configurationDirectory: String) = { val args = ArrayBuffer[String]( "-m", "yarn-cluster" ) {code} when fething deployed yarn cluster, with param "-m yarn-cluster" it will create new one, but has no "-n", still fail. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on issue #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
dawidwys commented on issue #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#issuecomment-448590963 Just to sum up my review. - Personally would prefer method `execute(String jobName, SavepointSettings savepointSettings`, but I am ok with the ctor as well - Before merging this PR please annotate newly introduced methods/ctor with `@Internal` or at least `@PublicEvolving` After that I am +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol closed pull request #7319: [hotfix][typo] fix typo in MLUtils
zentol closed pull request #7319: [hotfix][typo] fix typo in MLUtils URL: https://github.com/apache/flink/pull/7319 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala index f4119f5ab60..4c4072a00bb 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala @@ -35,7 +35,7 @@ import org.apache.flink.util.Collector * The file format is specified [http://svmlight.joachims.org/ here]. * * - writeLibSVM: - * Writes a data set of [[LabeledVector]] in libSVM/SVMLight format to disk. THe file format + * Writes a data set of [[LabeledVector]] in libSVM/SVMLight format to disk. The file format * is specified [http://svmlight.joachims.org/ here]. */ object MLUtils { This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol closed pull request #7337: [hotfix] Fix typos in ExecutionGraph
zentol closed pull request #7337: [hotfix] Fix typos in ExecutionGraph URL: https://github.com/apache/flink/pull/7337 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 56315e07146..cf00640dbaa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -811,16 +811,16 @@ public Executor getFutureExecutor() { // Actions // - public void attachJobGraph(List topologiallySorted) throws JobException { + public void attachJobGraph(List topologicallySorted) throws JobException { LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " + "vertices and {} intermediate results.", - topologiallySorted.size(), tasks.size(), intermediateResults.size()); + topologicallySorted.size(), tasks.size(), intermediateResults.size()); - final ArrayList newExecJobVertices = new ArrayList<>(topologiallySorted.size()); + final ArrayList newExecJobVertices = new ArrayList<>(topologicallySorted.size()); final long createTimestamp = System.currentTimeMillis(); - for (JobVertex jobVertex : topologiallySorted) { + for (JobVertex jobVertex : topologicallySorted) { if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) { this.isStoppable = false; This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on issue #7337: [hotfix] Fix typos in ExecutionGraph
zentol commented on issue #7337: [hotfix] Fix typos in ExecutionGraph URL: https://github.com/apache/flink/pull/7337#issuecomment-448589639 This typo does not affect the semantics of the variable, but merging this PR would obfuscate the git history. I will hence close the PR; it just doesn't _really_ add value. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r242907203 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + /** +* Set savepoint restore settings that will be used when executing the job. +* @param savepointRestoreSettings savepoint restore settings +*/ + public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { + this.savepointRestoreSettings = savepointRestoreSettings; Review comment: This argument was countered by the fact that the other parameters are also not per-execution. I don't mind, both the constructor or the method work for me. Perhaps we could change the constructor to the method above @tweise? I think we should have reached consensus then :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kisimple opened a new pull request #7337: [hotfix] Fix typos in ExecutionGraph
kisimple opened a new pull request #7337: [hotfix] Fix typos in ExecutionGraph URL: https://github.com/apache/flink/pull/7337 topologially -> topologically Rename the parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue
[ https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724969#comment-16724969 ] Chesnay Schepler commented on FLINK-11201: -- In 1.6 the MiniClusterResource resided entirely in flink-test-utils. In 1.7 we moved the core logic to flink-runtime (as test classes), retained the existing class in flink-test-utils as an alias (to not break existing tests), and added a dependency on flink-runtime:test-jar to flink-test-utils. Looking at the dependency tree with maven the test-jar dependency on flink-runtime is correctly pulled in when depending on flink-test-utils; so as far as I can tell this is either an issue on the side of sbt or there's a magic property that should be set in your build.sbt to include transitive dependencies of test dependencies during testing. In any case I would argue that this is not an issue on the side of Flink. > flink-test-utils dependency issue > - > > Key: FLINK-11201 > URL: https://issues.apache.org/jira/browse/FLINK-11201 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: eugen yushin >Priority: Major > > Starting with Flink 1.7, there's lack of > `runtime.testutils.MiniClusterResource` class in `flink-test-utils` > distribution. > Steps to reproduce (Scala code) > build.sbt > {code} > name := "flink-17-test-issue" > organization := "x.y.z" > scalaVersion := "2.11.12" > val flinkVersion = "1.7.0" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, > "org.scalatest" %% "scalatest" % "3.0.5" % Test, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test > // ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier > Artifact.TestsClassifier > ) > {code} > test class: > {code} > class SimpleTest extends AbstractTestBase with FlatSpecLike { > implicit val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > env.setRestartStrategy(RestartStrategies.noRestart()) > "SimpleTest" should "work" in { > val inputDs = env.fromElements(1,2,3) > inputDs.print() > env.execute() > } > } > {code} > Results in: > {code} > A needed class was not found. This could be due to an error in your runpath. > Missing class: org/apache/flink/runtime/testutils/MiniClusterResource > java.lang.NoClassDefFoundError: > org/apache/flink/runtime/testutils/MiniClusterResource > ... > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.runtime.testutils.MiniClusterResource > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 31 more > {code} > This can be fixed by adding flink-runtime distribution with test classifier > into dependencies list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-11203) FunctionContext of AggregateFunction will not be initialized for window GroupBy
Hequn Cheng created FLINK-11203: --- Summary: FunctionContext of AggregateFunction will not be initialized for window GroupBy Key: FLINK-11203 URL: https://issues.apache.org/jira/browse/FLINK-11203 Project: Flink Issue Type: Improvement Components: Table API SQL Reporter: Hequn Cheng Assignee: Hequn Cheng Currently, in tableApi/SQL, the implementable of aggregation of group window is base on the WindowStream and {{org.apache.flink.api.common.functions.AggregateFunction}}. Due to FLINK-11198, metrics cannot be accessed within {{org.apache.flink.table.functions.AggregateFunction}} either. It would be nice if we support metrics for both of them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r242903778 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -686,6 +697,17 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() { return options; } + /** +* Gets the thread number will used for downloading files from DFS when restore. +*/ + public int getNumberOfRestoringThreads() { + return numberOfRestoringThreads == -1 ? RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : numberOfRestoringThreads; Review comment: Let's keep it in sync with the logic for `enableIncrementalCheckpointing` for now. Later, we can revisit/refactor the backend configuration. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol opened a new pull request #7336: [FLINK-11194][hbase] Use type instead of classifier
zentol opened a new pull request #7336: [FLINK-11194][hbase] Use type instead of classifier URL: https://github.com/apache/flink/pull/7336 ## What is the purpose of the change The hbase-connector module has been disabling the shade-plugin due to an infinite loop that occurs when creating the dependency-reduced pom. As a result the released pom contained unresolved properties for the scala version, which breaks the connector when using scala 2.12. The loop is caused by having multiple instances for the same dependency with different classifiers. Classifiers are suffixes that added to jars; most notable "tests" for the test-jar. The plugin however _can_ handle multiple instances with different _types_, like `test-jar`. Since we refer to the same artifact regardless we can just use the `types` parameter to select the test-jar, preventing the loop. ## Brief change log * use `` instead of `` * remove custom shade-plugin configuration ## Verifying this change Compile the module and ensure that the dependency-reduced-pom.xml is created and does not contain unresolved properties. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-11194) missing Scala 2.12 build of HBase connector
[ https://issues.apache.org/jira/browse/FLINK-11194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-11194: --- Labels: artifact build hbase pull-request-available scala (was: artifact build hbase scala) > missing Scala 2.12 build of HBase connector > > > Key: FLINK-11194 > URL: https://issues.apache.org/jira/browse/FLINK-11194 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Build System >Affects Versions: 1.7.0 > Environment: Scala version 2.12.7 > Flink version 1.7.0 >Reporter: Zhenhao Li >Assignee: Chesnay Schepler >Priority: Major > Labels: artifact, build, hbase, pull-request-available, scala > > See the following SBT log. > ``` > [error] (update) sbt.librarymanagement.ResolveException: unresolved > dependency: org.apache.flink#flink-hbase_2.12;1.7.0: Resolution failed > several times for dependency: org.apache.flink#flink-hbase_2.12;1.7.0 > \{compile=[default(compile)]}:: > [error] java.text.ParseException: inconsistent module descriptor file > found in > 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom': > bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; > [error] java.text.ParseException: inconsistent module descriptor file > found in > 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom': > bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; > ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r242901812 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -120,6 +120,9 @@ /** This determines if incremental checkpointing is enabled. */ private final TernaryBoolean enableIncrementalCheckpointing; + /** Thread number used to download from DFS when restore, default value: 1. */ + private int numberOfRestoringThreads; Review comment: We introduced a setter for it: `setNumberOfRestoringThreads`, not to explode the number of constructor args like for `enableIncrementalCheckpointing`. At the moment `RocksDBStateBackend` basically serves as a configuration object and factory for the keyed and operator backends. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11198) Access to MetricGroup in an AggregateFunction(Non Rich)
[ https://issues.apache.org/jira/browse/FLINK-11198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724959#comment-16724959 ] Hequn Cheng commented on FLINK-11198: - [~chiggi_dev] Thanks for opening the issue. It would be nice if make metrics accessible from AggregateFunction. However, we should find a neat way to do this. > Access to MetricGroup in an AggregateFunction(Non Rich) > --- > > Key: FLINK-11198 > URL: https://issues.apache.org/jira/browse/FLINK-11198 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.6.2 >Reporter: Chirag Dewan >Assignee: Hequn Cheng >Priority: Major > > The only way to add custom metrics from UDF is through RuntimeContext. And, > RuntimeContext is wired in every RichFunction implementation. > However, for aggregate() in Windowed Stream, we cannot use the Rich version > of AggregateFunction. As I remotely understand, is done to avoid exposing the > state in the Aggregate UDF. > But can we have some minimal context which does not expose state but provide > metrics, classloader etc.in the UDF? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector
zentol commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector URL: https://github.com/apache/flink/pull/6782#discussion_r242899744 ## File path: flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.cassandra; + +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; + +/** + * Configuration for {@link CassandraSinkBase}. + */ +public class CassandraSinkBaseConfig implements Serializable { + // Default Configurations + + /** +* The default maximum number of concurrent requests. By default, {@code Integer.MAX_VALUE}. +*/ + public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = Integer.MAX_VALUE; + + /** +* The default timeout duration when acquiring a permit to execute. By default, {@code Long.MAX_VALUE}. +*/ + public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = Long.MAX_VALUE; + + /** +* The default timeout unit when acquiring a permit to execute. By default, milliseconds. +*/ + public static final TimeUnit DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS; + + // - Configuration Fields - + + /** Maximum number of concurrent requests allowed. */ + private final int maxConcurrentRequests; + + /** Timeout duration when acquiring a permit to execute. */ + private final long maxConcurrentRequestsTimeout; + + /** Timeout unit when acquiring a permit to execute. */ + private final TimeUnit maxConcurrentRequestsTimeoutUnit; + + public CassandraSinkBaseConfig( + int maxConcurrentRequests, + long maxConcurrentRequestsTimeout, + TimeUnit maxConcurrentRequestsTimeoutUnit) { + Preconditions.checkArgument(maxConcurrentRequests >= 0, + "Max concurrent requests is expected to be positive"); + Preconditions.checkArgument(maxConcurrentRequestsTimeout >= 0, + "Max concurrent requests timeout is expected to be positive"); + Preconditions.checkNotNull(maxConcurrentRequestsTimeoutUnit, + "Max concurrent requests timeout unit cannot be null"); + this.maxConcurrentRequests = maxConcurrentRequests; + this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout; + this.maxConcurrentRequestsTimeoutUnit = maxConcurrentRequestsTimeoutUnit; + } + + public int getMaxConcurrentRequests() { + return maxConcurrentRequests; + } + + public long getMaxConcurrentRequestsTimeout() { + return maxConcurrentRequestsTimeout; + } + + public TimeUnit getMaxConcurrentRequestsTimeoutUnit() { + return maxConcurrentRequestsTimeoutUnit; + } + + @Override + public String toString() { + return "CassandraSinkBaseConfig{" + + "maxConcurrentRequests=" + maxConcurrentRequests + + ", maxConcurrentRequestsTimeout=" + maxConcurrentRequestsTimeout + + ", maxConcurrentRequestsTimeoutUnit=" + maxConcurrentRequestsTimeoutUnit + + '}'; + } + + public static Builder newBuilder() { Review comment: This comment hasn't been addressed yet, the `Builder` constructor is still public. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS
zentol commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS URL: https://github.com/apache/flink/pull/6777#discussion_r242898360 ## File path: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ## @@ -120,6 +120,9 @@ /** This determines if incremental checkpointing is enabled. */ private final TernaryBoolean enableIncrementalCheckpointing; + /** Thread number used to download from DFS when restore, default value: 1. */ + private int numberOfRestoringThreads; Review comment: i think this can also be final This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-11198) Access to MetricGroup in an AggregateFunction(Non Rich)
[ https://issues.apache.org/jira/browse/FLINK-11198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hequn Cheng reassigned FLINK-11198: --- Assignee: Hequn Cheng > Access to MetricGroup in an AggregateFunction(Non Rich) > --- > > Key: FLINK-11198 > URL: https://issues.apache.org/jira/browse/FLINK-11198 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.6.2 >Reporter: Chirag Dewan >Assignee: Hequn Cheng >Priority: Major > > The only way to add custom metrics from UDF is through RuntimeContext. And, > RuntimeContext is wired in every RichFunction implementation. > However, for aggregate() in Windowed Stream, we cannot use the Rich version > of AggregateFunction. As I remotely understand, is done to avoid exposing the > state in the Aggregate UDF. > But can we have some minimal context which does not expose state but provide > metrics, classloader etc.in the UDF? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
dawidwys commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r242894460 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + /** +* Set savepoint restore settings that will be used when executing the job. +* @param savepointRestoreSettings savepoint restore settings +*/ + public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { + this.savepointRestoreSettings = savepointRestoreSettings; Review comment: Personally I do prefer the method. With the constructor your original concern still stands: > Savepoint restore should be configured per-execution basis and not for the lifetime of the RemoteStreamEnvironment This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597 Actually the only scenario where the randomness in the tmp files can help is in the case where we have a "split-brain" scenario. In this scenario, some TaskManagers (TMs) are considered down but they are not. So the job restarts, but the old TMs keep on processing data. In this case, the old TMs will not be able to commit any data, as they have **no** JobManager (JM) so no checkpoints, but they will be writing in temporary files. Now the new TMs, will also be writing in tmp files (and then committing them). If there is no randomness, then old and new TMs may be writing in the same temp files (if the FS allows it). That said, I am not so sure if this scenario can happen, _i.e._ if TMs can keep running without a JM and I am not sure if handling it at the sink is enough. @StephanEwen and @aljoscha ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u commented on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597 Actually the only scenario where the randomness in the tmp files can help is in the case where we have a "split-brain" scenario. In this scenario, some TaskManagers (TMs) are considered down but they are not. So the job restarts, but the old TMs keep on processing data. In this case, the old TMs will not be able to commit any data, as they have **no** JobManager (JM) so no checkpoints, but they will be writing in temporary files. Now the new TMs, will also be writing in tmp files (and then committing them). If there is no randomness, then old and new TMs may be writing in the same temp files (if the FS allows it). That said, I am not so sure if this scenario can happen, _i.e._ if TMs can keep running without a JM. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597 Actually the only scenario where the randomness in the tmp files can help is in the case where we have a "split-brain" scenario. In this scenario, some TaskManagers (TMs) are considered down but they are not. So the job restarts, but the old TMs keep on processing data. In this case, the old TMs will not be able to commit any data, as they have **no** JobManager (JM) so no checkpoints, but they will be writing in temporary files. Now the new TMs, will also be writing in tmp files (and then committing them). If there is no randomness, then old and new TMs may be writing in the same temp files (if the FS allows it). That said, I am not so sure if this scenario can happen, _i.e._ if TMs can keep running without a JM. @StephanEwen ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597 Actually the only scenario where the randomness in the tmp files can help is in the case where we have a "split-brain" scenario. In this scenario, some TaskManagers (TMs) are considered down but they are not. So the job restarts, but the old TMs keep on processing data. In this case, the old TMs will not be able to commit any data, as they have **no** JobManager (JM) so no checkpoints, but they will be writing in temporary files. Now the new TMs, will also be writing in tmp files (and then committing them). If there is no randomness, then old and new TMs may be writing in the same temp files (if the FS allows it). That said, I am not so sure if this scenario can happen, _i.e._ if TMs can keep running without a JM and I am not sure if handling it at the sink is enough. @StephanEwen ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers. URL: https://github.com/apache/flink/pull/7313#issuecomment-448519244 Thanks for the review @igalshilman ! I am not sure for the reason for introducing the randomness in the first place. I would assume that this was mainly out of extra cautiousness to not overwrite already existing data. Although in the case of `HDFS` and `local` FS this is guaranteed (for committed data) by the naming convention used. Maybe @StephanEwen, who introduced this, has something more to add. Apart from that, I integrated your comments. Let me know if it looks ok now. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue
[ https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724931#comment-16724931 ] eugen yushin commented on FLINK-11201: -- it works like a charm in 1.6, so I think something has been changed in 1.7 version regarding dependency management for flink-test-utils. btw, class from an error `org.apache.flink.runtime.testutils.MiniClusterResource` comes from test sources of flink-runtime, not the source code. > flink-test-utils dependency issue > - > > Key: FLINK-11201 > URL: https://issues.apache.org/jira/browse/FLINK-11201 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: eugen yushin >Priority: Major > > Starting with Flink 1.7, there's lack of > `runtime.testutils.MiniClusterResource` class in `flink-test-utils` > distribution. > Steps to reproduce (Scala code) > build.sbt > {code} > name := "flink-17-test-issue" > organization := "x.y.z" > scalaVersion := "2.11.12" > val flinkVersion = "1.7.0" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, > "org.scalatest" %% "scalatest" % "3.0.5" % Test, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test > // ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier > Artifact.TestsClassifier > ) > {code} > test class: > {code} > class SimpleTest extends AbstractTestBase with FlatSpecLike { > implicit val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > env.setRestartStrategy(RestartStrategies.noRestart()) > "SimpleTest" should "work" in { > val inputDs = env.fromElements(1,2,3) > inputDs.print() > env.execute() > } > } > {code} > Results in: > {code} > A needed class was not found. This could be due to an error in your runpath. > Missing class: org/apache/flink/runtime/testutils/MiniClusterResource > java.lang.NoClassDefFoundError: > org/apache/flink/runtime/testutils/MiniClusterResource > ... > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.runtime.testutils.MiniClusterResource > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 31 more > {code} > This can be fixed by adding flink-runtime distribution with test classifier > into dependencies list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-11180) ProcessFailureCancelingITCase#testCancelingOnProcessFailure
[ https://issues.apache.org/jira/browse/FLINK-11180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-11180. Resolution: Fixed Fix Version/s: 1.8.0 1.7.2 master: b02121c4673f80e606c7211d73de29c7295df7ae 1.7: 4113c6b57a9f9daefc00969b31fd202b02c4ac3f > ProcessFailureCancelingITCase#testCancelingOnProcessFailure > --- > > Key: FLINK-11180 > URL: https://issues.apache.org/jira/browse/FLINK-11180 > Project: Flink > Issue Type: Sub-task > Components: Tests >Affects Versions: 1.7.0 >Reporter: sunjincheng >Assignee: Hequn Cheng >Priority: Major > Labels: pull-request-available > Fix For: 1.7.2, 1.8.0 > > Time Spent: 20m > Remaining Estimate: 0h > > tag: release-1.7.1-rc2 > org.apache.flink.util.FlinkException: Could not create the > DispatcherResourceManagerComponent. > at > org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:242) > at > org.apache.flink.test.recovery.ProcessFailureCancelingITCase.testCancelingOnProcessFailure(ProcessFailureCancelingITCase.java:148) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) > at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > Caused by: java.net.BindException: Address already in use > at sun.nio.ch.Net.bind0(Native Method) > at sun.nio.ch.Net.bind(Net.java:433) > at sun.nio.ch.Net.bind(Net.java:425) > at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1358) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:1019) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254) > at > org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:366) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at >
[GitHub] zentol closed pull request #7327: [FLINK-11180][tests] Fix ProcessFailureCancelingITCase.testCancelingOnProcessFailure error by specifying a free port
zentol closed pull request #7327: [FLINK-11180][tests] Fix ProcessFailureCancelingITCase.testCancelingOnProcessFailure error by specifying a free port URL: https://github.com/apache/flink/pull/7327 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java index ed987d677d8..96e289dbf5c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.clusterframework.ApplicationStatus; @@ -116,6 +117,7 @@ public void testCancelingOnProcessFailure() throws Exception { config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m"); config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100); + config.setInteger(RestOptions.PORT, 0); final RpcService rpcService = AkkaRpcServiceUtils.createRpcService("localhost", 0, config); final int jobManagerPort = rpcService.getPort(); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r242884502 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + /** +* Set savepoint restore settings that will be used when executing the job. +* @param savepointRestoreSettings savepoint restore settings +*/ + public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { + this.savepointRestoreSettings = savepointRestoreSettings; Review comment: Unless you prefer the method instead of the constructor, then this should be ready to be merged :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore URL: https://github.com/apache/flink/pull/7249#discussion_r242884359 ## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java ## @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, Configuration clientConfig } } - @Override - public JobExecutionResult execute(String jobName) throws ProgramInvocationException { - StreamGraph streamGraph = getStreamGraph(); - streamGraph.setJobName(jobName); - transformations.clear(); - return executeRemotely(streamGraph, jarFiles); + /** +* Set savepoint restore settings that will be used when executing the job. +* @param savepointRestoreSettings savepoint restore settings +*/ + public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) { + this.savepointRestoreSettings = savepointRestoreSettings; Review comment: To be fair the code above has been changed, it's not a setter anymore but set in the constructor. You summed up quite nicely what is actually the state of this PR now. Nothing is broken in terms of backwards-compatibility but the structure changed a bit which makes it hard to review. The only difference is that an additional constructor has been added for the savepoint settings instead of this method you proposed: ``` public JobExecutionResult execute(String jobName, SavepointRestoreSettings savepointRestoreSettings) throws ProgramInvocationException { ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] dawidwys closed pull request #7321: [hotfix] Fix typo in AbstractStreamOperator
dawidwys closed pull request #7321: [hotfix] Fix typo in AbstractStreamOperator URL: https://github.com/apache/flink/pull/7321 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index d9a195cb235..fa5231ec1ed 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -639,7 +639,7 @@ public Object getCurrentKey() { if (keyedStateBackend != null) { return keyedStateBackend.getCurrentKey(); } else { - throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream."); + throw new UnsupportedOperationException("Key can only be retrieved on KeyedStream."); } } This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink
[ https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724901#comment-16724901 ] Maximilian Michels commented on FLINK-10672: No, [~thw] just set the Fix Version to empty because this is not fixed in 1.5.6. > Task stuck while writing output to flink > > > Key: FLINK-10672 > URL: https://issues.apache.org/jira/browse/FLINK-10672 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.4 > Environment: OS: Debuan rodente 4.17 > Flink version: 1.5.4 > ||Key||Value|| > |jobmanager.heap.mb|1024| > |jobmanager.rpc.address|localhost| > |jobmanager.rpc.port|6123| > |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter| > |metrics.reporter.jmx.port|9250-9260| > |metrics.reporters|jmx| > |parallelism.default|1| > |rest.port|8081| > |taskmanager.heap.mb|1024| > |taskmanager.numberOfTaskSlots|1| > |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26| > > h1. Overview > ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap > Size||Flink Managed Memory|| > |43501|1|0|12|62.9 GB|922 MB|642 MB| > h1. Memory > h2. JVM (Heap/Non-Heap) > ||Type||Committed||Used||Maximum|| > |Heap|922 MB|575 MB|922 MB| > |Non-Heap|68.8 MB|64.3 MB|-1 B| > |Total|991 MB|639 MB|922 MB| > h2. Outside JVM > ||Type||Count||Used||Capacity|| > |Direct|3,292|105 MB|105 MB| > |Mapped|0|0 B|0 B| > h1. Network > h2. Memory Segments > ||Type||Count|| > |Available|3,194| > |Total|3,278| > h1. Garbage Collection > ||Collector||Count||Time|| > |G1_Young_Generation|13|336| > |G1_Old_Generation|1|21| >Reporter: Ankur Goenka >Priority: Major > Labels: beam > Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, > jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, > jstack_66985.log > > > I am running a fairly complex pipleline with 200+ task. > The pipeline works fine with small data (order of 10kb input) but gets stuck > with a slightly larger data (300kb input). > > The task gets stuck while writing the output toFlink, more specifically it > gets stuck while requesting memory segment in local buffer pool. The Task > manager UI shows that it has enough memory and memory segments to work with. > The relevant stack trace is > {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 > tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at (C/C++) 0x7fef201c7dae (Unknown Source) > at (C/C++) 0x7fef1f2aea07 (Unknown Source) > at (C/C++) 0x7fef1f241cd3 (Unknown Source) > at java.lang.Object.wait(Native Method) > - waiting on <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247) > - locked <0xf6d56450> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26) > at > org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230) > - locked <0xf6a60bd0> (a java.lang.Object) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81) > at > org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139) > at > org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125) > at >
[GitHub] dawidwys closed pull request #7335: Release 1.7
dawidwys closed pull request #7335: Release 1.7 URL: https://github.com/apache/flink/pull/7335 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11186) Event-time balancing for multiple Kafka partitions
[ https://issues.apache.org/jira/browse/FLINK-11186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724899#comment-16724899 ] Tom Schamberger commented on FLINK-11186: - [~Paul Lin] The proposed source interface in FLIP-27 is an important precondition to be able to align the event time of different partitions within individual source subtasks. But if the parallelism of the source operator is greater than one, different source subtasks have to somehow communicate, how far each partition should be allowed to progress. This could be done by back-pressure produced by succeeding operators > Event-time balancing for multiple Kafka partitions > -- > > Key: FLINK-11186 > URL: https://issues.apache.org/jira/browse/FLINK-11186 > Project: Flink > Issue Type: New Feature > Components: DataStream API, Kafka Connector >Reporter: Tom Schamberger >Priority: Major > > Currently, it is not possible with Flink to back-pressure individual Kafka > partitions, which are faster in terms of event-time. This leads to > unnecessary memory consumption and can lead to deadlocks in the case of > back-pressure. > When multiple Kafka topics are consumed, succeeding event-time window > operators have to wait until the last Kafka partition has produced a > sufficient watermark to be triggered. If individual Kafka partitions differ > in read performance or the event-time of messages within partitions is not > monotonically distributed, this can lead to a situation, where 'fast' > partitions (event-time makes fast progress) outperform slower partitions > until back-pressuring prevents all partitions from being further consumed. > This leads to a deadlock of the application. > I suggest, that windows should be able to back-pressure individual > partitions, which progress faster in terms of event-time, so that slow > partitions can keep up. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue
[ https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724897#comment-16724897 ] Chesnay Schepler commented on FLINK-11201: -- flink-test-utils has a dependency on flink-runtime, including the test-jar. I don't understand why this isn't pulled in; is this an sbt thing? > flink-test-utils dependency issue > - > > Key: FLINK-11201 > URL: https://issues.apache.org/jira/browse/FLINK-11201 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: eugen yushin >Priority: Major > > Starting with Flink 1.7, there's lack of > `runtime.testutils.MiniClusterResource` class in `flink-test-utils` > distribution. > Steps to reproduce (Scala code) > build.sbt > {code} > name := "flink-17-test-issue" > organization := "x.y.z" > scalaVersion := "2.11.12" > val flinkVersion = "1.7.0" > libraryDependencies ++= Seq( > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, > "org.scalatest" %% "scalatest" % "3.0.5" % Test, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test > // ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier > Artifact.TestsClassifier > ) > {code} > test class: > {code} > class SimpleTest extends AbstractTestBase with FlatSpecLike { > implicit val env: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.setParallelism(1) > env.setRestartStrategy(RestartStrategies.noRestart()) > "SimpleTest" should "work" in { > val inputDs = env.fromElements(1,2,3) > inputDs.print() > env.execute() > } > } > {code} > Results in: > {code} > A needed class was not found. This could be due to an error in your runpath. > Missing class: org/apache/flink/runtime/testutils/MiniClusterResource > java.lang.NoClassDefFoundError: > org/apache/flink/runtime/testutils/MiniClusterResource > ... > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.runtime.testutils.MiniClusterResource > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 31 more > {code} > This can be fixed by adding flink-runtime distribution with test classifier > into dependencies list. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-11194) missing Scala 2.12 build of HBase connector
[ https://issues.apache.org/jira/browse/FLINK-11194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724896#comment-16724896 ] Chesnay Schepler commented on FLINK-11194: -- hmmm..so t's not that flink-hbase was not released for scala 2.12, the issue is that the published pom does not explicitly refer to scala 2.12, but the scala version property instead: {code} flink-hbase_${scala.binary.version}flink-hbasejar {code} The reason for this is simple; the shade-plugin is completely disabled in this module. Let's see what happens if we just enable it again... > missing Scala 2.12 build of HBase connector > > > Key: FLINK-11194 > URL: https://issues.apache.org/jira/browse/FLINK-11194 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Build System >Affects Versions: 1.7.0 > Environment: Scala version 2.12.7 > Flink version 1.7.0 >Reporter: Zhenhao Li >Assignee: vinoyang >Priority: Major > Labels: artifact, build, hbase, scala > > See the following SBT log. > ``` > [error] (update) sbt.librarymanagement.ResolveException: unresolved > dependency: org.apache.flink#flink-hbase_2.12;1.7.0: Resolution failed > several times for dependency: org.apache.flink#flink-hbase_2.12;1.7.0 > \{compile=[default(compile)]}:: > [error] java.text.ParseException: inconsistent module descriptor file > found in > 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom': > bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; > [error] java.text.ParseException: inconsistent module descriptor file > found in > 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom': > bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; > ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-11198) Access to MetricGroup in an AggregateFunction(Non Rich)
[ https://issues.apache.org/jira/browse/FLINK-11198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-11198: - Fix Version/s: (was: 1.6.3) > Access to MetricGroup in an AggregateFunction(Non Rich) > --- > > Key: FLINK-11198 > URL: https://issues.apache.org/jira/browse/FLINK-11198 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.6.2 >Reporter: Chirag Dewan >Priority: Major > > The only way to add custom metrics from UDF is through RuntimeContext. And, > RuntimeContext is wired in every RichFunction implementation. > However, for aggregate() in Windowed Stream, we cannot use the Rich version > of AggregateFunction. As I remotely understand, is done to avoid exposing the > state in the Aggregate UDF. > But can we have some minimal context which does not expose state but provide > metrics, classloader etc.in the UDF? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-11194) missing Scala 2.12 build of HBase connector
[ https://issues.apache.org/jira/browse/FLINK-11194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reassigned FLINK-11194: Assignee: Chesnay Schepler (was: vinoyang) > missing Scala 2.12 build of HBase connector > > > Key: FLINK-11194 > URL: https://issues.apache.org/jira/browse/FLINK-11194 > Project: Flink > Issue Type: Bug > Components: Batch Connectors and Input/Output Formats, Build System >Affects Versions: 1.7.0 > Environment: Scala version 2.12.7 > Flink version 1.7.0 >Reporter: Zhenhao Li >Assignee: Chesnay Schepler >Priority: Major > Labels: artifact, build, hbase, scala > > See the following SBT log. > ``` > [error] (update) sbt.librarymanagement.ResolveException: unresolved > dependency: org.apache.flink#flink-hbase_2.12;1.7.0: Resolution failed > several times for dependency: org.apache.flink#flink-hbase_2.12;1.7.0 > \{compile=[default(compile)]}:: > [error] java.text.ParseException: inconsistent module descriptor file > found in > 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom': > bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; > [error] java.text.ParseException: inconsistent module descriptor file > found in > 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom': > bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; > ``` -- This message was sent by Atlassian JIRA (v7.6.3#76005)