[jira] [Commented] (FLINK-11186) Event-time balancing for multiple Kafka partitions

2018-12-19 Thread Paul Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725627#comment-16725627
 ] 

Paul Lin commented on FLINK-11186:
--

[~tschamberger] Your reasoning is right. For the discussion about state sharing 
between subtasks please refer to 
[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Sharing-state-between-subtasks-td24489.html]
 .

> Event-time balancing for multiple Kafka partitions
> --
>
> Key: FLINK-11186
> URL: https://issues.apache.org/jira/browse/FLINK-11186
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Kafka Connector
>Reporter: Tom Schamberger
>Priority: Major
>
> Currently, it is not possible with Flink to back-pressure individual Kafka 
> partitions, which are faster in terms of event-time. This leads to 
> unnecessary memory consumption and can lead to deadlocks in the case of 
> back-pressure.
> When multiple Kafka topics are consumed, succeeding event-time window 
> operators have to wait until the last Kafka partition has produced a 
> sufficient watermark to be triggered. If individual Kafka partitions differ 
> in read performance or the event-time of messages within partitions is not 
> monotonically distributed, this can lead to a situation, where 'fast' 
> partitions (event-time makes fast progress) outperform slower partitions 
> until back-pressuring prevents all partitions from being further consumed. 
> This leads to a deadlock of the application.
> I suggest, that windows should be able to back-pressure individual 
> partitions, which progress faster in terms of event-time, so that slow 
> partitions can keep up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11174) flink Metrics Prometheus labels support chinese

2018-12-19 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725618#comment-16725618
 ] 

lamber-ken edited comment on FLINK-11174 at 12/20/18 6:38 AM:
--

hi, [~fanweiwen], you can follow 
[https://flink.apache.org/how-to-contribute.html] to submit your pr about this 
issue.

this is a good idea from my side.

 


was (Author: lamber-ken):
hi, [~fanweiwen], you can follow 
[https://flink.apache.org/how-to-contribute.html] to submit your pr about this 
issue

 

> flink Metrics Prometheus labels support chinese
> ---
>
> Key: FLINK-11174
> URL: https://issues.apache.org/jira/browse/FLINK-11174
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Fan weiwen
>Assignee: TisonKun
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2018-12-17-17-17-05-965.png
>
>
> use flink metrics and Prometheus 
> my job name is chinese 
> but  org.apache.flink.metrics.prometheus.AbstractPrometheusReporter
> replaceInvalidChars  only support  [a-zA-Z0-9:_] 
> so my job name is  replaceAll  
>  
> i think  labels key is  [a-zA-Z0-9:_]  ok 
> but  labels value  can support chinese?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11174) flink Metrics Prometheus labels support chinese

2018-12-19 Thread lamber-ken (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725618#comment-16725618
 ] 

lamber-ken commented on FLINK-11174:


hi, [~fanweiwen], you can follow 
[https://flink.apache.org/how-to-contribute.html] to submit your pr about this 
issue

 

> flink Metrics Prometheus labels support chinese
> ---
>
> Key: FLINK-11174
> URL: https://issues.apache.org/jira/browse/FLINK-11174
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Fan weiwen
>Assignee: TisonKun
>Priority: Minor
>  Labels: pull-request-available
> Attachments: image-2018-12-17-17-17-05-965.png
>
>
> use flink metrics and Prometheus 
> my job name is chinese 
> but  org.apache.flink.metrics.prometheus.AbstractPrometheusReporter
> replaceInvalidChars  only support  [a-zA-Z0-9:_] 
> so my job name is  replaceAll  
>  
> i think  labels key is  [a-zA-Z0-9:_]  ok 
> but  labels value  can support chinese?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-19 Thread GitBox
klion26 commented on issue #6777: [FLINK-10461] [State Backends, Checkpointing] 
Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#issuecomment-448885155
 
 
   @azagrebin @zentol thank you for the review, I've just updated the code, 
leave the logic of `getNumberOfRestoringThreads ` unchanged, in sync with the 
logic for `enableIncrementalCheckpointing` for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dianfu commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
dianfu commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243160395
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   What about only adding 
UserDefinedFunctionUtilTest#testRemoveStateViewFieldsFromAccTypeInfo in this 
PR? Just like you said, the tests of other methods of UserDefinedFunctionUtil 
can be done in another JIRA if needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

2018-12-19 Thread GitBox
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r243132804
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamodbStreamsProxy.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import 
com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX;
+import static 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider;
+import static 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties;
+
+/**
+ * DynamoDB streams proxy: interface interacting with the DynamoDB streams.
+ */
+public class DynamodbStreamsProxy extends KinesisProxy {
+   private static final Logger LOG = 
LoggerFactory.getLogger(DynamodbStreamsProxy.class);
+
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
DynamoDB Streams Connector";
+
+   // Backoff millis for the describe stream operation.
+   private final long describeStreamBaseBackoffMillis;
+   // Maximum backoff millis for the describe stream operation.
+   private final long describeStreamMaxBackoffMillis;
+   // Exponential backoff power constant for the describe stream operation.
+   private final double describeStreamExpConstant;
+
+   protected DynamodbStreamsProxy(Properties configProps) {
+   super(configProps);
+
+   // parse properties
+   describeStreamBaseBackoffMillis = Long.valueOf(
+   
configProps.getProperty(DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE,
+   
Long.toString(DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE)));
+   describeStreamMaxBackoffMillis = Long.valueOf(
+   

[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243154482
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   hmm. I think that's a good idea but should probably be addressed in another 
JIRA. I actually have [PR:6472](https://github.com/apache/flink/pull/6472) 
still pending which I am also very fond of the idea to add test for the Utility 
class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11077) Make subtask aware of the timeout of checkpoint and abort the current ongoing asynccheckpoint

2018-12-19 Thread aitozi (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

aitozi closed FLINK-11077.
--
Resolution: Won't Fix

> Make subtask aware of the timeout of checkpoint and abort the current ongoing 
> asynccheckpoint
> -
>
> Key: FLINK-11077
> URL: https://issues.apache.org/jira/browse/FLINK-11077
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.8.0
>Reporter: aitozi
>Assignee: aitozi
>Priority: Major
>
> Now checkpoint coordinator will cancel the checkpoint when checkpoint 
> timeout. But the subtask will continue to execute the async part / sync part 
> of the timeout checkpoint. When checkpoint state is large, it is a waste of 
> bandwidth. So I think the AsyncCheckpointRunnable should be aware of the 
> timeout checkpoint and interrupt them. What do you think [~StephanEwen]  
> [~srichter]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dianfu commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
dianfu commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243143806
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   The tests can still pass without NullSerializer is because NullSerializer is 
an optimization. It doesn't affect the correctness. So an IT case can not cover 
this kind of tests. May be we should add a unit test for 
UserDefinedFunctionUtil#removeStateViewFieldsFromAccTypeInfo. But I found that 
currently there is no unit test for UserDefinedFunctionUtil. What do you think 
about adding a unit test class for UserDefinedFunctionUtil?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 edited a comment on issue #6236: [FLINK-9699] [table] Add api to replace registered table

2018-12-19 Thread GitBox
hequn8128 edited a comment on issue #6236: [FLINK-9699] [table] Add api to 
replace registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-448821407
 
 
   Hi @bowenli86 , great to have your comments from the perspective of Catalog. 
I had taken a look at the Catalog API. It seems the internal replace flag may 
fit well with the API of Catalog. 
   The flag can be replaced by `dropTable(ignoreIfNotExists=true) + 
createTable` once those temp tables registered via TableEnvironment have been 
moved into an in-memory catalog.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] Clarkkkkk commented on issue #7258: [FLINK-11084]Throw a hard exception to remind developers while there's no stream node between two split transformation

2018-12-19 Thread GitBox
Clark commented on issue #7258: [FLINK-11084]Throw a hard exception to 
remind developers while there's no stream node between two split transformation
URL: https://github.com/apache/flink/pull/7258#issuecomment-448830390
 
 
   cc @dawidwys 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

2018-12-19 Thread GitBox
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r243132804
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/DynamodbStreamsProxy.java
 ##
 @@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.proxy;
+
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.ClientConfigurationFactory;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import 
com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
+import com.amazonaws.services.kinesis.AmazonKinesis;
+import com.amazonaws.services.kinesis.model.DescribeStreamRequest;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
+import com.amazonaws.services.kinesis.model.LimitExceededException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import com.amazonaws.services.kinesis.model.Shard;
+import com.amazonaws.services.kinesis.model.StreamStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_ENDPOINT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants.AWS_REGION;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT;
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.DYNAMODB_STREAMS_DESCRIBE_BACKOFF_MAX;
+import static 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.getCredentialsProvider;
+import static 
org.apache.flink.streaming.connectors.kinesis.util.AWSUtil.setAwsClientConfigProperties;
+
+/**
+ * DynamoDB streams proxy: interface interacting with the DynamoDB streams.
+ */
+public class DynamodbStreamsProxy extends KinesisProxy {
+   private static final Logger LOG = 
LoggerFactory.getLogger(DynamodbStreamsProxy.class);
+
+   /** Used for formatting Flink-specific user agent string when creating 
Kinesis client. */
+   private static final String USER_AGENT_FORMAT = "Apache Flink %s (%s) 
DynamoDB Streams Connector";
+
+   // Backoff millis for the describe stream operation.
+   private final long describeStreamBaseBackoffMillis;
+   // Maximum backoff millis for the describe stream operation.
+   private final long describeStreamMaxBackoffMillis;
+   // Exponential backoff power constant for the describe stream operation.
+   private final double describeStreamExpConstant;
+
+   protected DynamodbStreamsProxy(Properties configProps) {
+   super(configProps);
+
+   // parse properties
+   describeStreamBaseBackoffMillis = Long.valueOf(
+   
configProps.getProperty(DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE,
+   
Long.toString(DEFAULT_DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE)));
+   describeStreamMaxBackoffMillis = Long.valueOf(
+   

[GitHub] yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] [kinesis] Consuming data from DynamoDB streams to flink

2018-12-19 Thread GitBox
yxu-valleytider commented on a change in pull request #6968: [FLINK-4582] 
[kinesis] Consuming data from DynamoDB streams to flink
URL: https://github.com/apache/flink/pull/6968#discussion_r243132531
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java
 ##
 @@ -143,6 +143,34 @@ public SentinelSequenceNumber toSentinelSequenceNumber() {
/** The interval after which to consider a shard idle for purposes of 
watermark generation. */
public static final String SHARD_IDLE_INTERVAL_MILLIS = 
"flink.shard.idle.interval";
 
+  /**
+* The base backoff time between each describeStream attempt.
+* Different tag name to distinguish from 
"flink.stream.describe.backoff.base"
+* since the latter is deprecated.
+*/
+   public static final String DYNAMODB_STREAMS_DESCRIBE_BACKOFF_BASE =
 
 Review comment:
   OK. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table

2018-12-19 Thread GitBox
hequn8128 commented on issue #6236: [FLINK-9699] [table] Add api to replace 
registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-448821407
 
 
   Hi @bowenli86 , thanks for your advice. The internal replace flag may fit 
well with the API of Catalog. 
   The flag can be replaced by `dropTable(ignoreIfNotExists=true) + 
createTable` once those temp tables registered via TableEnvironment have been 
moved into an in-memory catalog.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-19 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725476#comment-16725476
 ] 

lining commented on FLINK-11162:


@Chesnay Schepler, thanks your reply. Maybe we need discuss it, as job is 
becoming more complex, because chain operator, it’s difficult for user to 
manage the job.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] aljoscha commented on a change in pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
aljoscha commented on a change in pull request #7313: 
[FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#discussion_r243039964
 
 

 ##
 File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
 ##
 @@ -121,24 +127,14 @@ public boolean supportsResume() {
return true;
}
 
-   @VisibleForTesting
-   static org.apache.hadoop.fs.Path generateStagingTempFilePath(
-   org.apache.hadoop.fs.FileSystem fs,
-   org.apache.hadoop.fs.Path targetFile) throws 
IOException {
-
+   private static org.apache.hadoop.fs.Path 
generateStagingTempFilePath(org.apache.hadoop.fs.Path targetFile) {
checkArgument(targetFile.isAbsolute(), "targetFile must be 
absolute");
 
final org.apache.hadoop.fs.Path parent = targetFile.getParent();
final String name = targetFile.getName();
 
checkArgument(parent != null, "targetFile must not be the root 
directory");
 
-   while (true) {
-   org.apache.hadoop.fs.Path candidate = new 
org.apache.hadoop.fs.Path(
-   parent, "." + name + ".inprogress." + 
UUID.randomUUID().toString());
-   if (!fs.exists(candidate)) {
-   return candidate;
-   }
-   }
+   return new org.apache.hadoop.fs.Path(parent, "." + name + 
".inprogress");
 
 Review comment:
   We should definitely wait for @StephanEwen to comment on this (as you 
already discussed in the comments). Since he's the original author and there 
might be a good reason behind this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] joerg84 commented on issue #7338: Fixed comment about scala versions.

2018-12-19 Thread GitBox
joerg84 commented on issue #7338: Fixed comment about scala versions.
URL: https://github.com/apache/flink/pull/7338#issuecomment-448706656
 
 
   > This is incorrect as Flink 1.7 also builds with scala 2.12.
 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 commented on issue #6236: [FLINK-9699] [table] Add api to replace registered table

2018-12-19 Thread GitBox
bowenli86 commented on issue #6236: [FLINK-9699] [table] Add api to replace 
registered table
URL: https://github.com/apache/flink/pull/6236#issuecomment-448693612
 
 
   I got your points. Sounds good. Thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243014619
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ##
 @@ -489,6 +491,88 @@ object UserDefinedFunctionUtils {
   )
 }
 
+def decorateDataViewTypeInfo(
+fieldTypeInfo: TypeInformation[_],
+fieldInstance: AnyRef,
+field: Field): (TypeInformation[_], Option[DataViewSpec[_]]) = 
fieldTypeInfo match {
+  case ct: CompositeType[_] if includesDataView(ct) =>
+throw new TableException(
+  "MapView and ListView only supported at first level of accumulators 
of Pojo, Tuple " +
+"and Case Class type.")
+  case map: MapViewTypeInfo[_, _] =>
+val mapView = fieldInstance.asInstanceOf[MapView[_, _]]
+val newTypeInfo = if (mapView != null && mapView.keyTypeInfo != null &&
+  mapView.valueTypeInfo != null) {
+  new MapViewTypeInfo(mapView.keyTypeInfo, mapView.valueTypeInfo)
+} else {
+  map
+}
+
+if (isStateBackedDataViews) {
+  newTypeInfo.nullSerializer = true
+
+  // create map view specs with unique id (used as state name)
+  val fieldName = field.getName
+  var spec = MapViewSpec(
+"agg" + index + "$" + fieldName,
+field,
+newTypeInfo)
+
+  (newTypeInfo, Some(spec))
+} else {
+  (newTypeInfo, None)
+}
+
+  case list: ListViewTypeInfo[_] =>
 
 Review comment:
   Sorry I was not making it clear. I meant maybe we can at a `Tuple` test case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr commented on a change in pull request #7237: [FLINK-7209] [table] Support DataView in Tuple and Case Class as the ACC type of AggregateFunction

2018-12-19 Thread GitBox
walterddr commented on a change in pull request #7237: [FLINK-7209] [table] 
Support DataView in Tuple and Case Class as the ACC type of AggregateFunction
URL: https://github.com/apache/flink/pull/7237#discussion_r243015377
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/dataview/NullSerializer.scala
 ##
 @@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.dataview
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
+class NullSerializer extends TypeSerializerSingleton[Any] {
 
 Review comment:
   Yeah. I think I also trace the same code path for Pojo. However, removing 
the `NullSerializer` still lets the newly modified 
`testGroupAggregateWithStateBackend` passes. this is worrying because 
essentially there's no test safeguard for this piece of code being removed in 
the future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] XuQianJin-Stars commented on issue #7318: [FLINK-11099][Table API] Migrate flink-table runtime CRow Types classes

2018-12-19 Thread GitBox
XuQianJin-Stars commented on issue #7318: [FLINK-11099][Table API] Migrate 
flink-table runtime  CRow  Types classes
URL: https://github.com/apache/flink/pull/7318#issuecomment-448658898
 
 
   @twalthr I update the branch or do a force push. 
   thanks
   qianjin


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aljoscha commented on a change in pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
aljoscha commented on a change in pull request #7313: 
[FLINK-6][fs-connector] Removed randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#discussion_r242982233
 
 

 ##
 File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
 ##
 @@ -60,6 +60,12 @@ public 
HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
@Override
public RecoverableFsDataOutputStream open(Path filePath) throws 
IOException {
final org.apache.hadoop.fs.Path targetFile = 
HadoopFileSystem.toHadoopPath(filePath);
+
+   // the finalized part must not exist already
+   if (fs.exists(targetFile)) {
 
 Review comment:
   Maybe put a more elaborate exception message that indicates that this is a 
bug. Because I think this shouldn't happen or you're in some deeper problem


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u removed a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u removed a comment on issue #7313: [FLINK-6][fs-connector] Removed 
randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448653987
 
 
   Yes, this is why I said "if the FS allows it". Thanks for the review 
@igalshilman ! I will merge...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u commented on issue #7313: [FLINK-6][fs-connector] Removed randomness 
from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448653987
 
 
   Yes, this is why I said "if the FS allows it". Thanks for the review 
@igalshilman ! I will merge...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u commented on issue #7313: [FLINK-6][fs-connector] Removed randomness 
from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448653989
 
 
   Yes, this is why I said "if the FS allows it". Thanks for the review 
@igalshilman ! I will merge...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11143) AskTimeoutException is thrown during job submission and completion

2018-12-19 Thread Alex Vinnik (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725145#comment-16725145
 ] 

Alex Vinnik commented on FLINK-11143:
-

[~QiLuo] we tried it, but it didn't solve the problem. Thanks for suggestion.

> AskTimeoutException is thrown during job submission and completion
> --
>
> Key: FLINK-11143
> URL: https://issues.apache.org/jira/browse/FLINK-11143
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.6.2
>Reporter: Alex Vinnik
>Priority: Major
>
> For more details please see the thread
> [http://mail-archives.apache.org/mod_mbox/flink-user/201812.mbox/%3cc2fb26f9-1410-4333-80f4-34807481b...@gmail.com%3E]
> On submission 
> 2018-12-12 02:28:31 ERROR JobsOverviewHandler:92 - Implementation error: 
> Unhandled exception.
>  akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#225683351|#225683351]] after [1 ms]. 
> Sender[null] sent message of type 
> "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
>  at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)
>  at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
>  at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
>  at 
> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
>  at 
> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
>  at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
>  at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
>  at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
>  at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
>  at java.lang.Thread.run(Thread.java:748)
>  
> On completion
>  
> {"errors":["Internal server error."," side:\njava.util.concurrent.CompletionException: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. 
> Sender[null] sent message of type 
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at 
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
> at akka.dispatch.OnComplete.internal(Future.scala:258)
> at akka.dispatch.OnComplete.internal(Future.scala:256)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> at 
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> at akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:603)
> at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
> at 
> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
> at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:109)
> at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
> at 
> akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
> at 
> akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
> at java.lang.Thread.run(Thread.java:748)\nCaused by: 
> akka.pattern.AskTimeoutException: Ask timed out on 
> [Actor[akka://flink/user/dispatcher#105638574]] after [1 ms]. 
> Sender[null] sent message of type 
> \"org.apache.flink.runtime.rpc.messages.LocalFencedMessage\".
> at 
> akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:604)\n\t...
>  9 more\n\nEnd of exception on server side>"]}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue

2018-12-19 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725130#comment-16725130
 ] 

eugen yushin commented on FLINK-11201:
--

at the same time, looks like to sbt issue mentioned in:
Transitive dependencies with classifier "test" are not include in the classpath 
#2964
https://github.com/sbt/sbt/issues/2964

Nevertheless, I think it makes sense to add a note into docs to point out this 
explicitly. So nobody will spend extra time seeking for this behavior 
explanation in future.

> flink-test-utils dependency issue
> -
>
> Key: FLINK-11201
> URL: https://issues.apache.org/jira/browse/FLINK-11201
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: eugen yushin
>Priority: Major
>
> Starting with Flink 1.7, there's lack of 
> `runtime.testutils.MiniClusterResource` class in `flink-test-utils` 
> distribution.
> Steps to reproduce (Scala code)
> build.sbt
> {code}
> name := "flink-17-test-issue"
> organization := "x.y.z"
> scalaVersion := "2.11.12"
> val flinkVersion = "1.7.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.scalatest" %% "scalatest" % "3.0.5" % Test,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
> //  ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier 
> Artifact.TestsClassifier
> )
> {code}
> test class:
> {code}
> class SimpleTest extends AbstractTestBase with FlatSpecLike {
>   implicit val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   env.setParallelism(1)
>   env.setRestartStrategy(RestartStrategies.noRestart())
>   "SimpleTest" should "work" in {
> val inputDs = env.fromElements(1,2,3)
> inputDs.print()
> env.execute()
>   }
> }
> {code}
> Results in:
> {code}
> A needed class was not found. This could be due to an error in your runpath. 
> Missing class: org/apache/flink/runtime/testutils/MiniClusterResource
> java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/testutils/MiniClusterResource
> ...
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.testutils.MiniClusterResource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 31 more
> {code}
> This can be fixed by adding flink-runtime distribution with test classifier 
> into dependencies list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak

2018-12-19 Thread Nawaid Shamim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725121#comment-16725121
 ] 

Nawaid Shamim edited comment on FLINK-9080 at 12/19/18 3:49 PM:


*Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader 
will load all the required classes from classpath.



was (Author: nawaidshamim):
*Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader 
will load all the required classes from classpath.


 !Screenshot 2018-12-18 at 15.47.55.png! 

> Flink Scheduler goes OOM, suspecting a memory leak
> --
>
> Key: FLINK-9080
> URL: https://issues.apache.org/jira/browse/FLINK-9080
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Rohit Singh
>Assignee: Stefan Richter
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level 
> packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png
>
>
> Running FLink version 1.4.0. on mesos,scheduler running along  with job 
> manager in single container, whereas task managers running in seperate 
> containers.
> Couple of jobs were running continously, Flink scheduler was working 
> properlyalong with task managers. Due to some change in data, one of the jobs 
> started failing continuously. In the meantime,there was a surge in  flink 
> scheduler memory usually eventually died out off OOM
>  
> Memory dump analysis was done, 
> Following were findings  !Top Level packages.JPG!!Top level classes.JPG!
>  *  Majority of top loaded packages retaining heap indicated towards 
> Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top 
> level package image)
>  * Top level classes were of Flinkuserclassloader, (Top Level class image)
>  * The number of classes loaded vs unloaded was quite less  PFA,inspite of 
> adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , 
> PFAclassloaded vs unloaded graph, scheduler was restarted 3 times
>  * There were custom classes as well which were duplicated during subsequent 
> class uploads
> PFA all the images of heap dump.  Can you suggest some pointers on as to how 
> to overcome this issue.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak

2018-12-19 Thread Nawaid Shamim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725121#comment-16725121
 ] 

Nawaid Shamim commented on FLINK-9080:
--

*Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader 
will load all the required classes from classpath.


 !Screenshot 2018-12-18 at 15.47.55.png! 

> Flink Scheduler goes OOM, suspecting a memory leak
> --
>
> Key: FLINK-9080
> URL: https://issues.apache.org/jira/browse/FLINK-9080
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Rohit Singh
>Assignee: Stefan Richter
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level 
> packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png
>
>
> Running FLink version 1.4.0. on mesos,scheduler running along  with job 
> manager in single container, whereas task managers running in seperate 
> containers.
> Couple of jobs were running continously, Flink scheduler was working 
> properlyalong with task managers. Due to some change in data, one of the jobs 
> started failing continuously. In the meantime,there was a surge in  flink 
> scheduler memory usually eventually died out off OOM
>  
> Memory dump analysis was done, 
> Following were findings  !Top Level packages.JPG!!Top level classes.JPG!
>  *  Majority of top loaded packages retaining heap indicated towards 
> Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top 
> level package image)
>  * Top level classes were of Flinkuserclassloader, (Top Level class image)
>  * The number of classes loaded vs unloaded was quite less  PFA,inspite of 
> adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , 
> PFAclassloaded vs unloaded graph, scheduler was restarted 3 times
>  * There were custom classes as well which were duplicated during subsequent 
> class uploads
> PFA all the images of heap dump.  Can you suggest some pointers on as to how 
> to overcome this issue.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-11205:
--
Attachment: Screenshot 2018-12-18 at 15.47.55.png

> Task Manager Metaspace Memory Leak 
> ---
>
> Key: FLINK-11205
> URL: https://issues.apache.org/jira/browse/FLINK-11205
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nawaid Shamim
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png, Screenshot 
> 2018-12-18 at 15.47.55.png
>
>
> Job Restarts causes task manager to dynamically load duplicate classes. 
> Metaspace is unbounded and grows with every restart. YARN aggressively kill 
> such containers but this affect is immediately seems on different task 
> manager which results in death spiral.
> Task Manager uses dynamic loader as described in 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
> {quote}
> *YARN*
> YARN classloading differs between single job deployments and sessions:
>  * When submitting a Flink job/application directly to YARN (via {{bin/flink 
> run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are 
> started for that job. Those JVMs have both Flink framework classes and user 
> code classes in the Java classpath. That means that there is _no dynamic 
> classloading_ involved in that case.
>  * When starting a YARN session, the JobManagers and TaskManagers are started 
> with the Flink framework classes in the classpath. The classes from all jobs 
> that are submitted against the session are loaded dynamically.
> {quote}
> The above is not entirely true specially when you set {{-yD 
> classloader.resolve-order=parent-first}} . We also above observed the above 
> behaviour when submitting a Flink job/application directly to YARN (via 
> {{bin/flink run -m yarn-cluster ...}}).
> !Screenshot 2018-12-18 at 12.14.11.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725120#comment-16725120
 ] 

Nawaid Shamim commented on FLINK-11205:
---

*Workaround:* Copy Flink job JAR into /usr/lib/flink/lib and AppClassLoader 
will load all the required classes from classpath.


 !Screenshot 2018-12-18 at 15.47.55.png! 

> Task Manager Metaspace Memory Leak 
> ---
>
> Key: FLINK-11205
> URL: https://issues.apache.org/jira/browse/FLINK-11205
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nawaid Shamim
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png, Screenshot 
> 2018-12-18 at 15.47.55.png
>
>
> Job Restarts causes task manager to dynamically load duplicate classes. 
> Metaspace is unbounded and grows with every restart. YARN aggressively kill 
> such containers but this affect is immediately seems on different task 
> manager which results in death spiral.
> Task Manager uses dynamic loader as described in 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
> {quote}
> *YARN*
> YARN classloading differs between single job deployments and sessions:
>  * When submitting a Flink job/application directly to YARN (via {{bin/flink 
> run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are 
> started for that job. Those JVMs have both Flink framework classes and user 
> code classes in the Java classpath. That means that there is _no dynamic 
> classloading_ involved in that case.
>  * When starting a YARN session, the JobManagers and TaskManagers are started 
> with the Flink framework classes in the classpath. The classes from all jobs 
> that are submitted against the session are loaded dynamically.
> {quote}
> The above is not entirely true specially when you set {{-yD 
> classloader.resolve-order=parent-first}} . We also above observed the above 
> behaviour when submitting a Flink job/application directly to YARN (via 
> {{bin/flink run -m yarn-cluster ...}}).
> !Screenshot 2018-12-18 at 12.14.11.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-11205:
--
Attachment: (was: Screenshot 2018-12-18 at 12.12.15.png)

> Task Manager Metaspace Memory Leak 
> ---
>
> Key: FLINK-11205
> URL: https://issues.apache.org/jira/browse/FLINK-11205
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nawaid Shamim
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png
>
>
> Job Restarts causes task manager to dynamically load duplicate classes. 
> Metaspace is unbounded and grows with every restart. YARN aggressively kill 
> such containers but this affect is immediately seems on different task 
> manager which results in death spiral.
> Task Manager uses dynamic loader as described in 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
> {quote}
> *YARN*
> YARN classloading differs between single job deployments and sessions:
>  * When submitting a Flink job/application directly to YARN (via {{bin/flink 
> run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are 
> started for that job. Those JVMs have both Flink framework classes and user 
> code classes in the Java classpath. That means that there is _no dynamic 
> classloading_ involved in that case.
>  * When starting a YARN session, the JobManagers and TaskManagers are started 
> with the Flink framework classes in the classpath. The classes from all jobs 
> that are submitted against the session are loaded dynamically.
> {quote}
> The above is not entirely true specially when you set {{-yD 
> classloader.resolve-order=parent-first}} . We also above observed the above 
> behaviour when submitting a Flink job/application directly to YARN (via 
> {{bin/flink run -m yarn-cluster ...}}).
> !Screenshot 2018-12-18 at 12.14.11.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-11205:
--
Attachment: Screenshot 2018-12-18 at 12.12.15.png

> Task Manager Metaspace Memory Leak 
> ---
>
> Key: FLINK-11205
> URL: https://issues.apache.org/jira/browse/FLINK-11205
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nawaid Shamim
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.12.15.png, Screenshot 
> 2018-12-18 at 12.14.11.png
>
>
> Job Restarts causes task manager to dynamically load duplicate classes. 
> Metaspace is unbounded and grows with every restart. YARN aggressively kill 
> such containers but this affect is immediately seems on different task 
> manager which results in death spiral.
> Task Manager uses dynamic loader as described in 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
> {quote}
> *YARN*
> YARN classloading differs between single job deployments and sessions:
>  * When submitting a Flink job/application directly to YARN (via {{bin/flink 
> run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are 
> started for that job. Those JVMs have both Flink framework classes and user 
> code classes in the Java classpath. That means that there is _no dynamic 
> classloading_ involved in that case.
>  * When starting a YARN session, the JobManagers and TaskManagers are started 
> with the Flink framework classes in the classpath. The classes from all jobs 
> that are submitted against the session are loaded dynamically.
> {quote}
> The above is not entirely true specially when you set {{-yD 
> classloader.resolve-order=parent-first}} . We also above observed the above 
> behaviour when submitting a Flink job/application directly to YARN (via 
> {{bin/flink run -m yarn-cluster ...}}).
> !Screenshot 2018-12-18 at 12.14.11.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-9080:
-
Attachment: Screenshot 2018-12-18 at 12.14.11.png

> Flink Scheduler goes OOM, suspecting a memory leak
> --
>
> Key: FLINK-9080
> URL: https://issues.apache.org/jira/browse/FLINK-9080
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Rohit Singh
>Assignee: Stefan Richter
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level 
> packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png
>
>
> Running FLink version 1.4.0. on mesos,scheduler running along  with job 
> manager in single container, whereas task managers running in seperate 
> containers.
> Couple of jobs were running continously, Flink scheduler was working 
> properlyalong with task managers. Due to some change in data, one of the jobs 
> started failing continuously. In the meantime,there was a surge in  flink 
> scheduler memory usually eventually died out off OOM
>  
> Memory dump analysis was done, 
> Following were findings  !Top Level packages.JPG!!Top level classes.JPG!
>  *  Majority of top loaded packages retaining heap indicated towards 
> Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top 
> level package image)
>  * Top level classes were of Flinkuserclassloader, (Top Level class image)
>  * The number of classes loaded vs unloaded was quite less  PFA,inspite of 
> adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , 
> PFAclassloaded vs unloaded graph, scheduler was restarted 3 times
>  * There were custom classes as well which were duplicated during subsequent 
> class uploads
> PFA all the images of heap dump.  Can you suggest some pointers on as to how 
> to overcome this issue.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9080) Flink Scheduler goes OOM, suspecting a memory leak

2018-12-19 Thread Nawaid Shamim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725118#comment-16725118
 ] 

Nawaid Shamim commented on FLINK-9080:
--

I noticed similar issue on job restarts - 
https://issues.apache.org/jira/browse/FLINK-11205


 !Screenshot 2018-12-18 at 12.14.11.png! 

> Flink Scheduler goes OOM, suspecting a memory leak
> --
>
> Key: FLINK-9080
> URL: https://issues.apache.org/jira/browse/FLINK-9080
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Rohit Singh
>Assignee: Stefan Richter
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png, Top Level 
> packages.JPG, Top level classes.JPG, classesloaded vs unloaded.png
>
>
> Running FLink version 1.4.0. on mesos,scheduler running along  with job 
> manager in single container, whereas task managers running in seperate 
> containers.
> Couple of jobs were running continously, Flink scheduler was working 
> properlyalong with task managers. Due to some change in data, one of the jobs 
> started failing continuously. In the meantime,there was a surge in  flink 
> scheduler memory usually eventually died out off OOM
>  
> Memory dump analysis was done, 
> Following were findings  !Top Level packages.JPG!!Top level classes.JPG!
>  *  Majority of top loaded packages retaining heap indicated towards 
> Flinkuserclassloader, glassfish(jersey library), Finalizer classes. (Top 
> level package image)
>  * Top level classes were of Flinkuserclassloader, (Top Level class image)
>  * The number of classes loaded vs unloaded was quite less  PFA,inspite of 
> adding jvm options of -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled , 
> PFAclassloaded vs unloaded graph, scheduler was restarted 3 times
>  * There were custom classes as well which were duplicated during subsequent 
> class uploads
> PFA all the images of heap dump.  Can you suggest some pointers on as to how 
> to overcome this issue.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11202) Split log file per job

2018-12-19 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725111#comment-16725111
 ] 

vinoyang commented on FLINK-11202:
--

OK, Got it. Before I start this issue, I will provide a design document to 
discuss.

> Split log file per job
> --
>
> Key: FLINK-11202
> URL: https://issues.apache.org/jira/browse/FLINK-11202
> Project: Flink
>  Issue Type: Improvement
>Reporter: chauncy
>Assignee: vinoyang
>Priority: Major
>
> Debugging issues is difficult since Task-/JobManagers create a single log 
> file in standalone cluster environments. I think having a log file for each 
> job would be preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10317) Configure Metaspace size by default

2018-12-19 Thread Nawaid Shamim (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725116#comment-16725116
 ] 

Nawaid Shamim commented on FLINK-10317:
---

I noticed similar issue on job restart - 
https://issues.apache.org/jira/browse/FLINK-11205

 !Screenshot 2018-12-18 at 12.14.11.png! 

> Configure Metaspace size by default
> ---
>
> Key: FLINK-10317
> URL: https://issues.apache.org/jira/browse/FLINK-10317
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
> Attachments: Screenshot 2018-12-18 at 12.14.11.png
>
>
> We should set the size of the JVM Metaspace to a sane default, like  
> {{-XX:MaxMetaspaceSize=256m}}.
> If not set, the JVM offheap memory will grow indefinitely with repeated 
> classloading and Jitting, eventually exceeding allowed memory on docker/yarn 
> or similar setups.
> It is hard to come up with a good default, however, I believe the error 
> messages one gets when metaspace is too small are easy to understand (and 
> easy to take action), while it is very hard to figure out why the memory 
> footprint keeps growing steadily and infinitely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10317) Configure Metaspace size by default

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-10317:
--
Attachment: Screenshot 2018-12-18 at 12.14.11.png

> Configure Metaspace size by default
> ---
>
> Key: FLINK-10317
> URL: https://issues.apache.org/jira/browse/FLINK-10317
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Affects Versions: 1.5.3, 1.6.0, 1.7.0
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
> Attachments: Screenshot 2018-12-18 at 12.14.11.png
>
>
> We should set the size of the JVM Metaspace to a sane default, like  
> {{-XX:MaxMetaspaceSize=256m}}.
> If not set, the JVM offheap memory will grow indefinitely with repeated 
> classloading and Jitting, eventually exceeding allowed memory on docker/yarn 
> or similar setups.
> It is hard to come up with a good default, however, I believe the error 
> messages one gets when metaspace is too small are easy to understand (and 
> easy to take action), while it is very hard to figure out why the memory 
> footprint keeps growing steadily and infinitely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11202) Split log file per job

2018-12-19 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725108#comment-16725108
 ] 

Chesnay Schepler commented on FLINK-11202:
--

This was brought up before and I can see the point, but we need a design on how 
to implement this that doesn't require a full rewrite of all of our logging 
code.

> Split log file per job
> --
>
> Key: FLINK-11202
> URL: https://issues.apache.org/jira/browse/FLINK-11202
> Project: Flink
>  Issue Type: Improvement
>Reporter: chauncy
>Assignee: vinoyang
>Priority: Major
>
> Debugging issues is difficult since Task-/JobManagers create a single log 
> file in standalone cluster environments. I think having a log file for each 
> job would be preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue

2018-12-19 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725115#comment-16725115
 ] 

eugen yushin commented on FLINK-11201:
--

It's a behavior change which is not reflected in docs.
If code from the description works fine on your env, I have no questions. But 
it doesn't on 2 dev's machines I have.

> flink-test-utils dependency issue
> -
>
> Key: FLINK-11201
> URL: https://issues.apache.org/jira/browse/FLINK-11201
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: eugen yushin
>Priority: Major
>
> Starting with Flink 1.7, there's lack of 
> `runtime.testutils.MiniClusterResource` class in `flink-test-utils` 
> distribution.
> Steps to reproduce (Scala code)
> build.sbt
> {code}
> name := "flink-17-test-issue"
> organization := "x.y.z"
> scalaVersion := "2.11.12"
> val flinkVersion = "1.7.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.scalatest" %% "scalatest" % "3.0.5" % Test,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
> //  ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier 
> Artifact.TestsClassifier
> )
> {code}
> test class:
> {code}
> class SimpleTest extends AbstractTestBase with FlatSpecLike {
>   implicit val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   env.setParallelism(1)
>   env.setRestartStrategy(RestartStrategies.noRestart())
>   "SimpleTest" should "work" in {
> val inputDs = env.fromElements(1,2,3)
> inputDs.print()
> env.execute()
>   }
> }
> {code}
> Results in:
> {code}
> A needed class was not found. This could be due to an error in your runpath. 
> Missing class: org/apache/flink/runtime/testutils/MiniClusterResource
> java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/testutils/MiniClusterResource
> ...
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.testutils.MiniClusterResource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 31 more
> {code}
> This can be fixed by adding flink-runtime distribution with test classifier 
> into dependencies list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-11205:
--
Description: 
Job Restarts causes task manager to dynamically load duplicate classes. 
Metaspace is unbounded and grows with every restart. YARN aggressively kill 
such containers but this affect is immediately seems on different task manager 
which results in death spiral.

Task Manager uses dynamic loader as described in 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
{quote}
*YARN*

YARN classloading differs between single job deployments and sessions:
 * When submitting a Flink job/application directly to YARN (via {{bin/flink 
run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started 
for that job. Those JVMs have both Flink framework classes and user code 
classes in the Java classpath. That means that there is _no dynamic 
classloading_ involved in that case.

 * When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the classpath. The classes from all jobs 
that are submitted against the session are loaded dynamically.
{quote}

The above is not entirely true specially when you set {{-yD 
classloader.resolve-order=parent-first}} . We also above observed the above 
behaviour when submitting a Flink job/application directly to YARN (via 
{{bin/flink run -m yarn-cluster ...}}).

!Screenshot 2018-12-18 at 12.14.11.png!

  was:
Job Restarts causes task manager to dynamically load duplicate classes. 
Metaspace is unbounded and grows with every restart. YARN aggressively kill 
such containers but this affect is immediately seems on different task manager 
which results in death spiral.

!Screenshot 2018-12-18 at 12.14.11.png!thumbnail!

Task Manager uses dynamic loader as described in 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
{quote}
*YARN*

YARN classloading differs between single job deployments and sessions:
 * When submitting a Flink job/application directly to YARN (via {{bin/flink 
run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started 
for that job. Those JVMs have both Flink framework classes and user code 
classes in the Java classpath. That means that there is _no dynamic 
classloading_ involved in that case.

 * When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the classpath. The classes from all jobs 
that are submitted against the session are loaded dynamically.
{quote}

The above is not entirely true specially when you set {{-yD 
classloader.resolve-order=parent-first}} . We also above observed the above 
behaviour when submitting a Flink job/application directly to YARN (via 
{{bin/flink run -m yarn-cluster ...}}).



> Task Manager Metaspace Memory Leak 
> ---
>
> Key: FLINK-11205
> URL: https://issues.apache.org/jira/browse/FLINK-11205
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nawaid Shamim
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png
>
>
> Job Restarts causes task manager to dynamically load duplicate classes. 
> Metaspace is unbounded and grows with every restart. YARN aggressively kill 
> such containers but this affect is immediately seems on different task 
> manager which results in death spiral.
> Task Manager uses dynamic loader as described in 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
> {quote}
> *YARN*
> YARN classloading differs between single job deployments and sessions:
>  * When submitting a Flink job/application directly to YARN (via {{bin/flink 
> run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are 
> started for that job. Those JVMs have both Flink framework classes and user 
> code classes in the Java classpath. That means that there is _no dynamic 
> classloading_ involved in that case.
>  * When starting a YARN session, the JobManagers and TaskManagers are started 
> with the Flink framework classes in the classpath. The classes from all jobs 
> that are submitted against the session are loaded dynamically.
> {quote}
> The above is not entirely true specially when you set {{-yD 
> classloader.resolve-order=parent-first}} . We also above observed the above 
> behaviour when submitting a Flink job/application directly to YARN (via 
> {{bin/flink run -m yarn-cluster ...}}).
> !Screenshot 2018-12-18 at 12.14.11.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-11205:
--
Description: 
Job Restarts causes task manager to dynamically load duplicate classes. 
Metaspace is unbounded and grows with every restart. YARN aggressively kill 
such containers but this affect is immediately seems on different task manager 
which results in death spiral.

!Screenshot 2018-12-18 at 12.14.11.png!thumbnail!

Task Manager uses dynamic loader as described in 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
{quote}
*YARN*

YARN classloading differs between single job deployments and sessions:
 * When submitting a Flink job/application directly to YARN (via {{bin/flink 
run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started 
for that job. Those JVMs have both Flink framework classes and user code 
classes in the Java classpath. That means that there is _no dynamic 
classloading_ involved in that case.

 * When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the classpath. The classes from all jobs 
that are submitted against the session are loaded dynamically.
{quote}

The above is not entirely true specially when you set {{-yD 
classloader.resolve-order=parent-first}} . We also above observed the above 
behaviour when submitting a Flink job/application directly to YARN (via 
{{bin/flink run -m yarn-cluster ...}}).


  was:
Job Restarts causes task manager to dynamically load duplicate classes. 
Metaspace is unbounded and grows with every restart. YARN aggressively kill 
such containers but this affect is immediately seems on different task manager 
which results in death spiral.

!Screenshot 2018-12-18 at 12.14.11.png!

Task Manager uses dynamic loader as described in 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
{quote}
*YARN*

YARN classloading differs between single job deployments and sessions:
 * When submitting a Flink job/application directly to YARN (via {{bin/flink 
run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started 
for that job. Those JVMs have both Flink framework classes and user code 
classes in the Java classpath. That means that there is _no dynamic 
classloading_ involved in that case.

 * When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the classpath. The classes from all jobs 
that are submitted against the session are loaded dynamically.
{quote}

The above is not entirely true specially when you set {{-yD 
classloader.resolve-order=parent-first}} . We also above observed the above 
behaviour when submitting a Flink job/application directly to YARN (via 
{{bin/flink run -m yarn-cluster ...}}).



> Task Manager Metaspace Memory Leak 
> ---
>
> Key: FLINK-11205
> URL: https://issues.apache.org/jira/browse/FLINK-11205
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nawaid Shamim
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png
>
>
> Job Restarts causes task manager to dynamically load duplicate classes. 
> Metaspace is unbounded and grows with every restart. YARN aggressively kill 
> such containers but this affect is immediately seems on different task 
> manager which results in death spiral.
> !Screenshot 2018-12-18 at 12.14.11.png!thumbnail!
> Task Manager uses dynamic loader as described in 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
> {quote}
> *YARN*
> YARN classloading differs between single job deployments and sessions:
>  * When submitting a Flink job/application directly to YARN (via {{bin/flink 
> run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are 
> started for that job. Those JVMs have both Flink framework classes and user 
> code classes in the Java classpath. That means that there is _no dynamic 
> classloading_ involved in that case.
>  * When starting a YARN session, the JobManagers and TaskManagers are started 
> with the Flink framework classes in the classpath. The classes from all jobs 
> that are submitted against the session are loaded dynamically.
> {quote}
> The above is not entirely true specially when you set {{-yD 
> classloader.resolve-order=parent-first}} . We also above observed the above 
> behaviour when submitting a Flink job/application directly to YARN (via 
> {{bin/flink run -m yarn-cluster ...}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nawaid Shamim updated FLINK-11205:
--
Description: 
Job Restarts causes task manager to dynamically load duplicate classes. 
Metaspace is unbounded and grows with every restart. YARN aggressively kill 
such containers but this affect is immediately seems on different task manager 
which results in death spiral.

!Screenshot 2018-12-18 at 12.14.11.png!

Task Manager uses dynamic loader as described in 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
{quote}
*YARN*

YARN classloading differs between single job deployments and sessions:
 * When submitting a Flink job/application directly to YARN (via {{bin/flink 
run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started 
for that job. Those JVMs have both Flink framework classes and user code 
classes in the Java classpath. That means that there is _no dynamic 
classloading_ involved in that case.

 * When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the classpath. The classes from all jobs 
that are submitted against the session are loaded dynamically.
{quote}

The above is not entirely true specially when you set {{-yD 
classloader.resolve-order=parent-first}} . We also above observed the above 
behaviour when submitting a Flink job/application directly to YARN (via 
{{bin/flink run -m yarn-cluster ...}}).


  was:
Job Restarts causes task manager to dynamically load duplicate classes. 
Metaspace is unbounded and grows with every restart. YARN aggressively kill 
such containers but this affect is immediately seems on different task manager 
which results in death spiral.

!Screenshot 2018-12-18 at 12.14.11.png!width=480!

Task Manager uses dynamic loader as described in 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
{quote}
*YARN*

YARN classloading differs between single job deployments and sessions:
 * When submitting a Flink job/application directly to YARN (via {{bin/flink 
run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started 
for that job. Those JVMs have both Flink framework classes and user code 
classes in the Java classpath. That means that there is _no dynamic 
classloading_ involved in that case.

 * When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the classpath. The classes from all jobs 
that are submitted against the session are loaded dynamically.
{quote}

The above is not entirely true specially when you set {{-yD 
classloader.resolve-order=parent-first}} . We also above observed the above 
behaviour when submitting a Flink job/application directly to YARN (via 
{{bin/flink run -m yarn-cluster ...}}).



> Task Manager Metaspace Memory Leak 
> ---
>
> Key: FLINK-11205
> URL: https://issues.apache.org/jira/browse/FLINK-11205
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.5, 1.6.2, 1.7.0
>Reporter: Nawaid Shamim
>Priority: Major
> Attachments: Screenshot 2018-12-18 at 12.14.11.png
>
>
> Job Restarts causes task manager to dynamically load duplicate classes. 
> Metaspace is unbounded and grows with every restart. YARN aggressively kill 
> such containers but this affect is immediately seems on different task 
> manager which results in death spiral.
> !Screenshot 2018-12-18 at 12.14.11.png!
> Task Manager uses dynamic loader as described in 
> [https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
> {quote}
> *YARN*
> YARN classloading differs between single job deployments and sessions:
>  * When submitting a Flink job/application directly to YARN (via {{bin/flink 
> run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are 
> started for that job. Those JVMs have both Flink framework classes and user 
> code classes in the Java classpath. That means that there is _no dynamic 
> classloading_ involved in that case.
>  * When starting a YARN session, the JobManagers and TaskManagers are started 
> with the Flink framework classes in the classpath. The classes from all jobs 
> that are submitted against the session are loaded dynamically.
> {quote}
> The above is not entirely true specially when you set {{-yD 
> classloader.resolve-order=parent-first}} . We also above observed the above 
> behaviour when submitting a Flink job/application directly to YARN (via 
> {{bin/flink run -m yarn-cluster ...}}).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11205) Task Manager Metaspace Memory Leak

2018-12-19 Thread Nawaid Shamim (JIRA)
Nawaid Shamim created FLINK-11205:
-

 Summary: Task Manager Metaspace Memory Leak 
 Key: FLINK-11205
 URL: https://issues.apache.org/jira/browse/FLINK-11205
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.7.0, 1.6.2, 1.5.5
Reporter: Nawaid Shamim
 Attachments: Screenshot 2018-12-18 at 12.14.11.png

Job Restarts causes task manager to dynamically load duplicate classes. 
Metaspace is unbounded and grows with every restart. YARN aggressively kill 
such containers but this affect is immediately seems on different task manager 
which results in death spiral.

!Screenshot 2018-12-18 at 12.14.11.png!width=480!

Task Manager uses dynamic loader as described in 
[https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/debugging_classloading.html]
{quote}
*YARN*

YARN classloading differs between single job deployments and sessions:
 * When submitting a Flink job/application directly to YARN (via {{bin/flink 
run -m yarn-cluster ...}}), dedicated TaskManagers and JobManagers are started 
for that job. Those JVMs have both Flink framework classes and user code 
classes in the Java classpath. That means that there is _no dynamic 
classloading_ involved in that case.

 * When starting a YARN session, the JobManagers and TaskManagers are started 
with the Flink framework classes in the classpath. The classes from all jobs 
that are submitted against the session are loaded dynamically.
{quote}

The above is not entirely true specially when you set {{-yD 
classloader.resolve-order=parent-first}} . We also above observed the above 
behaviour when submitting a Flink job/application directly to YARN (via 
{{bin/flink run -m yarn-cluster ...}}).




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11202) Split log file per job

2018-12-19 Thread vinoyang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-11202:


Assignee: vinoyang

> Split log file per job
> --
>
> Key: FLINK-11202
> URL: https://issues.apache.org/jira/browse/FLINK-11202
> Project: Flink
>  Issue Type: Improvement
>Reporter: chauncy
>Assignee: vinoyang
>Priority: Major
>
> Debugging issues is difficult since Task-/JobManagers create a single log 
> file in standalone cluster environments. I think having a log file for each 
> job would be preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11202) Split log file per job

2018-12-19 Thread vinoyang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725083#comment-16725083
 ] 

vinoyang commented on FLINK-11202:
--

[~Zentol] Do you think we should split log file per job for both job manager 
and task manager?

> Split log file per job
> --
>
> Key: FLINK-11202
> URL: https://issues.apache.org/jira/browse/FLINK-11202
> Project: Flink
>  Issue Type: Improvement
>Reporter: chauncy
>Priority: Major
>
> Debugging issues is difficult since Task-/JobManagers create a single log 
> file in standalone cluster environments. I think having a log file for each 
> job would be preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol removed a comment on issue #7338: Fixed comment about scala versions.

2018-12-19 Thread GitBox
zentol removed a comment on issue #7338: Fixed comment about scala versions.
URL: https://github.com/apache/flink/pull/7338#issuecomment-448621648
 
 
   This is incorrect as Flink 1.7 also builds with scala 2.12. The entire 
section nee


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7338: Fixed comment about scala versions.

2018-12-19 Thread GitBox
zentol commented on issue #7338: Fixed comment about scala versions.
URL: https://github.com/apache/flink/pull/7338#issuecomment-448621648
 
 
   This is incorrect as Flink 1.7 also builds with scala 2.12. The entire 
section nee


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7338: Fixed comment about scala versions.

2018-12-19 Thread GitBox
zentol commented on issue #7338: Fixed comment about scala versions.
URL: https://github.com/apache/flink/pull/7338#issuecomment-448621675
 
 
   This is incorrect as Flink 1.7 also builds with scala 2.12.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11202) Split log file per job

2018-12-19 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11202:
-
Description: Debugging issues is difficult since Task-/JobManagers create a 
single log file in standalone cluster environments. I think having a log file 
for each job would be preferable.  (was: find bug is difficult due totask 
manager  and job manager's  log into one big log file  with standalone cluster 
env  , i think each job has a log file is profile )

> Split log file per job
> --
>
> Key: FLINK-11202
> URL: https://issues.apache.org/jira/browse/FLINK-11202
> Project: Flink
>  Issue Type: Improvement
>Reporter: chauncy
>Priority: Major
>
> Debugging issues is difficult since Task-/JobManagers create a single log 
> file in standalone cluster environments. I think having a log file for each 
> job would be preferable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11202) Split log file per job

2018-12-19 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11202:
-
Summary: Split log file per job  (was: log split file, every job has one 
log file )

> Split log file per job
> --
>
> Key: FLINK-11202
> URL: https://issues.apache.org/jira/browse/FLINK-11202
> Project: Flink
>  Issue Type: Improvement
>Reporter: chauncy
>Priority: Major
>
> find bug is difficult due totask manager  and job manager's  log into one 
> big log file  with standalone cluster env  , i think each job has a log file 
> is profile 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-19 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725045#comment-16725045
 ] 

Chesnay Schepler commented on FLINK-11162:
--

No. As I said _before_ you opened your PR, I don't think we're willing to do 
these changes right now.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11200) Port DataView classes to flink-table-common

2018-12-19 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725040#comment-16725040
 ] 

Hequn Cheng commented on FLINK-11200:
-

[~twalthr] Make sense, I will try to follow your idea. 

> Port DataView classes to flink-table-common
> ---
>
> Key: FLINK-11200
> URL: https://issues.apache.org/jira/browse/FLINK-11200
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Hequn Cheng
>Priority: Major
>
> {{DataView}} are used within aggregate functions for more efficient state 
> management. Logically, they should have been ported in FLINK-10689.
> This issue only includes {{org.apache.flink.table.api.dataview.*}} and 
> {{org.apache.flink.table.dataview.StateListView/StateMapView}}. The latter 
> one is shared between planning and runtime phase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #7318: [FLINK-11099][Table API] Migrate flink-table runtime CRow Types classes

2018-12-19 Thread GitBox
twalthr commented on issue #7318: [FLINK-11099][Table API] Migrate 
flink-table runtime  CRow  Types classes
URL: https://github.com/apache/flink/pull/7318#issuecomment-448610899
 
 
   @XuQianJin-Stars no need for a new PR, just update the branch or do a force 
push. Thanks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11200) Port DataView classes to flink-table-common

2018-12-19 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725032#comment-16725032
 ] 

Timo Walther edited comment on FLINK-11200 at 12/19/18 2:14 PM:


[~hequn8128] You are right. I haven't seen the transitive class dependencies. 
Would be great if we can find a better solution with cleaner interfaces than 
just porting {{ListViewTypeInfo}}, {{ListViewSerializer}} into 
{{table-common}}. In my opinion, those classes don't fit 100% into the a module 
that is shared across a lot of Flink modules. But if you find no cleaner way 
while maintaining backwards compatibility, feel free to port them as well.


was (Author: twalthr):
You are right. I haven't seen the transitive class dependencies. Would be great 
if we can find a better solution with cleaner interfaces than just porting 
{{ListViewTypeInfo}}, {{ListViewSerializer}} into {{table-common}}. In my 
opinion, those classes don't fit 100% into the a module that is shared across a 
lot of Flink modules. But if you find no cleaner way while maintaining 
backwards compatibility, feel free to port them as well.

> Port DataView classes to flink-table-common
> ---
>
> Key: FLINK-11200
> URL: https://issues.apache.org/jira/browse/FLINK-11200
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Hequn Cheng
>Priority: Major
>
> {{DataView}} are used within aggregate functions for more efficient state 
> management. Logically, they should have been ported in FLINK-10689.
> This issue only includes {{org.apache.flink.table.api.dataview.*}} and 
> {{org.apache.flink.table.dataview.StateListView/StateMapView}}. The latter 
> one is shared between planning and runtime phase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11200) Port DataView classes to flink-table-common

2018-12-19 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16725032#comment-16725032
 ] 

Timo Walther commented on FLINK-11200:
--

You are right. I haven't seen the transitive class dependencies. Would be great 
if we can find a better solution with cleaner interfaces than just porting 
{{ListViewTypeInfo}}, {{ListViewSerializer}} into {{table-common}}. In my 
opinion, those classes don't fit 100% into the a module that is shared across a 
lot of Flink modules. But if you find no cleaner way while maintaining 
backwards compatibility, feel free to port them as well.

> Port DataView classes to flink-table-common
> ---
>
> Key: FLINK-11200
> URL: https://issues.apache.org/jira/browse/FLINK-11200
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Hequn Cheng
>Priority: Major
>
> {{DataView}} are used within aggregate functions for more efficient state 
> management. Logically, they should have been ported in FLINK-10689.
> This issue only includes {{org.apache.flink.table.api.dataview.*}} and 
> {{org.apache.flink.table.dataview.StateListView/StateMapView}}. The latter 
> one is shared between planning and runtime phase.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] igalshilman commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
igalshilman commented on issue #7313: [FLINK-6][fs-connector] Removed 
randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448601803
 
 
   Good point @kl0u, however in HDFS there would be a single writer at a time, 
protected by the writer lease.
   Thanks for addressing, the review comments, I think the tests can be made a 
bit more readable, but we may like to followup on this as part of a different 
issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11204) flink start-scala-shell.sh do not work in security mode with kerberos authentication.

2018-12-19 Thread Jeff Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724997#comment-16724997
 ] 

Jeff Zhang commented on FLINK-11204:


Do you run kinit before starting scala-shell ?

> flink start-scala-shell.sh do not work in security mode with kerberos 
> authentication.
> -
>
> Key: FLINK-11204
> URL: https://issues.apache.org/jira/browse/FLINK-11204
> Project: Flink
>  Issue Type: Bug
>  Components: Scala Shell
>Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2, 1.7.0
>Reporter: kelun wang
>Priority: Minor
>
> Hello,
> When using flink start-scala-shell.sh in a cluster with kerberos credential, 
> the script does not supports Kerberos authentication, errors like below will 
> occur:
> 1) Fail to deploy Yarn cluster.
>  
> {code:java}
> start-scala-shell.sh yarn -n 3
> Exception in thread "main" java.lang.RuntimeException: Error deploying the 
> YARN cluster
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594)
> at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81)
> at 
> org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256)
> at 
> org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165)
> at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189)
> at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188)
> at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137)
> at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
> at 
> org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
> at 
> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318)
> at 
> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303)
> at 
> org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391)
> at 
> org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384)
> at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351)
> ... 8 more
> {code}
>  
> 2)Fail to fetch deployed a Flink cluster, when using the following command :
> bin/start-scala-shell.sh yarn
>  
> {code:java}
> def fetchDeployedYarnClusterInfo(
> configuration: Configuration,
> configurationDirectory: String) = {
> val args = ArrayBuffer[String](
> "-m", "yarn-cluster"
> )
> {code}
> when fething deployed yarn cluster, with param "-m yarn-cluster" it will 
> create new one, but has no "-n", still fail.
>  
>   
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] joerg84 commented on issue #7338: Fixed comment about scala versions.

2018-12-19 Thread GitBox
joerg84 commented on issue #7338: Fixed comment about scala versions.
URL: https://github.com/apache/flink/pull/7338#issuecomment-448596345
 
 
   @fhueske PTAL


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] joerg84 opened a new pull request #7338: Fixed comment about scala versions.

2018-12-19 Thread GitBox
joerg84 opened a new pull request #7338: Fixed comment about scala versions.
URL: https://github.com/apache/flink/pull/7338
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] jinglining commented on issue #7325: [FLINK-11162][rest] Provide a rest API to list all logical operators

2018-12-19 Thread GitBox
jinglining commented on issue #7325: [FLINK-11162][rest] Provide a rest API to 
list all logical operators
URL: https://github.com/apache/flink/pull/7325#issuecomment-448594791
 
 
   @zentol can you review it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11162) Provide a rest API to list all logical operators

2018-12-19 Thread lining (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724995#comment-16724995
 ] 

lining commented on FLINK-11162:


[~Zentol] I have pushed the code, can you review it.

> Provide a rest API to list all logical operators
> 
>
> Key: FLINK-11162
> URL: https://issues.apache.org/jira/browse/FLINK-11162
> Project: Flink
>  Issue Type: New Feature
>  Components: REST
>Reporter: vinoyang
>Assignee: lining
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> The scene of this issue:
> We are using the indicator variable of the operator: , 
> .
> We have customized the display of the indicator. Based on the query purpose, 
> we currently lack an interface to get all the logical operators of a job. The 
> current rest API only provides the chained node information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11204) flink start-scala-shell.sh do not work in security mode with kerberos authentication.

2018-12-19 Thread kelun wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kelun wang updated FLINK-11204:
---
Description: 
Hello,

When using flink start-scala-shell.sh in a cluster with kerberos credential, 
the script does not supports Kerberos authentication, errors like below will 
occur:

1) Fail to deploy Yarn cluster.

 
{code:java}
start-scala-shell.sh yarn -n 3
Exception in thread "main" java.lang.RuntimeException: Error deploying the YARN 
cluster
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81)
at 
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256)
at 
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165)
at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303)
at 
org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391)
at 
org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351)
... 8 more
{code}
 

2)Fail to fetch deployed a Flink cluster, when using the following command :

bin/start-scala-shell.sh yarn

 
{code:java}
def fetchDeployedYarnClusterInfo(
configuration: Configuration,
configurationDirectory: String) = {


val args = ArrayBuffer[String](
"-m", "yarn-cluster"
)
{code}
when fething deployed yarn cluster, with param "-m yarn-cluster" it will create 
new one, but has no "-n", still fail.

 
  
  

  was:
Hello,

When using flink start-scala-shell.sh in a cluster with kerberos credential, 
the script do not supports Kerberos authentication, errors like below will 
occur:

1) Fail to deploy Yarn cluster.

 
{code:java}
start-scala-shell.sh yarn -n 3
Exception in thread "main" java.lang.RuntimeException: Error deploying the YARN 
cluster
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81)
at 
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256)
at 
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165)
at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303)
at 
org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391)
at 
org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351)
... 8 more
{code}
 

2)Fail to fetch deployed a Flink cluster, when using the following command :

bin/start-scala-shell.sh yarn

 
{code:java}
def fetchDeployedYarnClusterInfo(
configuration: Configuration,
configurationDirectory: String) = {


val args = ArrayBuffer[String](
"-m", "yarn-cluster"
)
{code}
when fething deployed yarn cluster, with param "-m yarn-cluster" it will create 
new one, but has no "-n", still fail.

 
 
 


> flink start-scala-shell.sh do not work in security mode with kerberos 
> authentication.
> -
>
> Key: FLINK-11204
> URL: https://issues.apache.org/jira/browse/FLINK-11204
> Project: Flink
>  Issue 

[GitHub] zentol commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-12-19 Thread GitBox
zentol commented on issue #6782: [FLINK-9083][Cassandra Connector] Add async 
backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#issuecomment-448593297
 
 
   Will address my last comment while merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11204) flink start-scala-shell.sh do not work in security mode with kerberos authentication.

2018-12-19 Thread kelun wang (JIRA)
kelun wang created FLINK-11204:
--

 Summary: flink start-scala-shell.sh do not work in security mode 
with kerberos authentication.
 Key: FLINK-11204
 URL: https://issues.apache.org/jira/browse/FLINK-11204
 Project: Flink
  Issue Type: Bug
  Components: Scala Shell
Affects Versions: 1.7.0, 1.6.2, 1.5.5, 1.4.2, 1.3.3
Reporter: kelun wang


Hello,

When using flink start-scala-shell.sh in a cluster with kerberos credential, 
the script do not supports Kerberos authentication, errors like below will 
occur:

1) Fail to deploy Yarn cluster.

 
{code:java}
start-scala-shell.sh yarn -n 3
Exception in thread "main" java.lang.RuntimeException: Error deploying the YARN 
cluster
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:594)
at 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.createCluster(FlinkYarnSessionCli.java:81)
at 
org.apache.flink.api.scala.FlinkShell$.deployNewYarnCluster(FlinkShell.scala:256)
at 
org.apache.flink.api.scala.FlinkShell$.fetchConnectionInfo(FlinkShell.scala:165)
at org.apache.flink.api.scala.FlinkShell$.liftedTree1$1(FlinkShell.scala:189)
at org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:188)
at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:137)
at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
Caused by: java.lang.IllegalArgumentException: Can't get Kerberos realm
at 
org.apache.hadoop.security.HadoopKerberosName.setConfiguration(HadoopKerberosName.java:65)
at 
org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:318)
at 
org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:303)
at 
org.apache.hadoop.security.UserGroupInformation.isAuthenticationMethodEnabled(UserGroupInformation.java:391)
at 
org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled(UserGroupInformation.java:385)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:384)
at 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:351)
... 8 more
{code}
 

2)Fail to fetch deployed a Flink cluster, when using the following command :

bin/start-scala-shell.sh yarn

 
{code:java}
def fetchDeployedYarnClusterInfo(
configuration: Configuration,
configurationDirectory: String) = {


val args = ArrayBuffer[String](
"-m", "yarn-cluster"
)
{code}
when fething deployed yarn cluster, with param "-m yarn-cluster" it will create 
new one, but has no "-n", still fail.

 
 
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on issue #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-19 Thread GitBox
dawidwys commented on issue #7249: [FLINK-11048] Ability to programmatically 
execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#issuecomment-448590963
 
 
   Just to sum up my review. 
   
   - Personally would prefer method `execute(String jobName, SavepointSettings 
savepointSettings`, but I am ok with the ctor as well
   - Before merging this PR please annotate newly introduced methods/ctor with 
`@Internal` or at least `@PublicEvolving`
   
   After that I am +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol closed pull request #7319: [hotfix][typo] fix typo in MLUtils

2018-12-19 Thread GitBox
zentol closed pull request #7319: [hotfix][typo] fix typo in MLUtils
URL: https://github.com/apache/flink/pull/7319
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
index f4119f5ab60..4c4072a00bb 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala
@@ -35,7 +35,7 @@ import org.apache.flink.util.Collector
   *   The file format is specified [http://svmlight.joachims.org/ here].
   *
   * - writeLibSVM:
-  *   Writes a data set of [[LabeledVector]] in libSVM/SVMLight format to 
disk. THe file format
+  *   Writes a data set of [[LabeledVector]] in libSVM/SVMLight format to 
disk. The file format
   *   is specified [http://svmlight.joachims.org/ here].
   */
 object MLUtils {


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol closed pull request #7337: [hotfix] Fix typos in ExecutionGraph

2018-12-19 Thread GitBox
zentol closed pull request #7337: [hotfix] Fix typos in ExecutionGraph
URL: https://github.com/apache/flink/pull/7337
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 56315e07146..cf00640dbaa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -811,16 +811,16 @@ public Executor getFutureExecutor() {
//  Actions
// 

 
-   public void attachJobGraph(List topologiallySorted) throws 
JobException {
+   public void attachJobGraph(List topologicallySorted) throws 
JobException {
 
LOG.debug("Attaching {} topologically sorted vertices to 
existing job graph with {} " +
"vertices and {} intermediate results.",
-   topologiallySorted.size(), tasks.size(), 
intermediateResults.size());
+   topologicallySorted.size(), tasks.size(), 
intermediateResults.size());
 
-   final ArrayList newExecJobVertices = new 
ArrayList<>(topologiallySorted.size());
+   final ArrayList newExecJobVertices = new 
ArrayList<>(topologicallySorted.size());
final long createTimestamp = System.currentTimeMillis();
 
-   for (JobVertex jobVertex : topologiallySorted) {
+   for (JobVertex jobVertex : topologicallySorted) {
 
if (jobVertex.isInputVertex() && 
!jobVertex.isStoppable()) {
this.isStoppable = false;


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on issue #7337: [hotfix] Fix typos in ExecutionGraph

2018-12-19 Thread GitBox
zentol commented on issue #7337: [hotfix] Fix typos in ExecutionGraph
URL: https://github.com/apache/flink/pull/7337#issuecomment-448589639
 
 
   This typo does not affect the semantics of the variable, but merging this PR 
would obfuscate the git history.
   
   I will hence close the PR; it just doesn't _really_ add value.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-19 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r242907203
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   /**
+* Set savepoint restore settings that will be used when executing the 
job.
+* @param savepointRestoreSettings savepoint restore settings
+*/
+   public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
+   this.savepointRestoreSettings = savepointRestoreSettings;
 
 Review comment:
   This argument was countered by the fact that the other parameters are also 
not per-execution. I don't mind, both the constructor or the method work for me.
   
   Perhaps we could change the constructor to the method above @tweise? I think 
we should have reached consensus then :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kisimple opened a new pull request #7337: [hotfix] Fix typos in ExecutionGraph

2018-12-19 Thread GitBox
kisimple opened a new pull request #7337: [hotfix] Fix typos in ExecutionGraph
URL: https://github.com/apache/flink/pull/7337
 
 
   topologially -> topologically
   Rename the parameter.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue

2018-12-19 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724969#comment-16724969
 ] 

Chesnay Schepler commented on FLINK-11201:
--

In 1.6 the MiniClusterResource resided entirely in flink-test-utils.

In 1.7 we moved the core logic to flink-runtime (as test classes), retained the 
existing class in flink-test-utils as an alias (to not break existing tests), 
and added a dependency on flink-runtime:test-jar to flink-test-utils.

Looking at the dependency tree with maven the test-jar dependency on 
flink-runtime is correctly pulled in when depending on flink-test-utils; so as 
far as I can tell this is either an issue on the side of sbt or there's a magic 
property that should be set in your build.sbt to include transitive 
dependencies of test dependencies during testing.

In any case I would argue that this is not an issue on the side of Flink.

> flink-test-utils dependency issue
> -
>
> Key: FLINK-11201
> URL: https://issues.apache.org/jira/browse/FLINK-11201
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: eugen yushin
>Priority: Major
>
> Starting with Flink 1.7, there's lack of 
> `runtime.testutils.MiniClusterResource` class in `flink-test-utils` 
> distribution.
> Steps to reproduce (Scala code)
> build.sbt
> {code}
> name := "flink-17-test-issue"
> organization := "x.y.z"
> scalaVersion := "2.11.12"
> val flinkVersion = "1.7.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.scalatest" %% "scalatest" % "3.0.5" % Test,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
> //  ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier 
> Artifact.TestsClassifier
> )
> {code}
> test class:
> {code}
> class SimpleTest extends AbstractTestBase with FlatSpecLike {
>   implicit val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   env.setParallelism(1)
>   env.setRestartStrategy(RestartStrategies.noRestart())
>   "SimpleTest" should "work" in {
> val inputDs = env.fromElements(1,2,3)
> inputDs.print()
> env.execute()
>   }
> }
> {code}
> Results in:
> {code}
> A needed class was not found. This could be due to an error in your runpath. 
> Missing class: org/apache/flink/runtime/testutils/MiniClusterResource
> java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/testutils/MiniClusterResource
> ...
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.testutils.MiniClusterResource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 31 more
> {code}
> This can be fixed by adding flink-runtime distribution with test classifier 
> into dependencies list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11203) FunctionContext of AggregateFunction will not be initialized for window GroupBy

2018-12-19 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-11203:
---

 Summary: FunctionContext of AggregateFunction will not be 
initialized for window GroupBy
 Key: FLINK-11203
 URL: https://issues.apache.org/jira/browse/FLINK-11203
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


Currently, in tableApi/SQL, the implementable of aggregation of group window is 
base on the WindowStream and 
{{org.apache.flink.api.common.functions.AggregateFunction}}. 
Due to FLINK-11198, metrics cannot be accessed within 
{{org.apache.flink.table.functions.AggregateFunction}} either. It would be nice 
if we support metrics for both of them. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-19 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r242903778
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -686,6 +697,17 @@ public RocksDBNativeMetricOptions 
getMemoryWatcherOptions() {
return options;
}
 
+   /**
+* Gets the thread number will used for downloading files from DFS when 
restore.
+*/
+   public int getNumberOfRestoringThreads() {
+   return numberOfRestoringThreads == -1 ? 
RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : 
numberOfRestoringThreads;
 
 Review comment:
   Let's keep it in sync with the logic for `enableIncrementalCheckpointing` 
for now. 
   Later, we can revisit/refactor the backend configuration.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol opened a new pull request #7336: [FLINK-11194][hbase] Use type instead of classifier

2018-12-19 Thread GitBox
zentol opened a new pull request #7336: [FLINK-11194][hbase] Use type instead 
of classifier
URL: https://github.com/apache/flink/pull/7336
 
 
   ## What is the purpose of the change
   
   The hbase-connector module has been disabling the shade-plugin due to an 
infinite loop that occurs when creating the dependency-reduced pom. As a result 
the released pom contained unresolved properties for the scala version, which 
breaks the connector when using scala 2.12.
   
   The loop is caused by having multiple instances for the same dependency with 
different classifiers. Classifiers are suffixes that added to jars; most 
notable "tests" for the test-jar.
   The plugin however _can_ handle multiple instances with different _types_, 
like `test-jar`.
   
   Since we refer to the same artifact regardless we can just use the `types` 
parameter to select the test-jar, preventing the loop.
   
   ## Brief change log
   
   * use `` instead of  ``
   * remove custom shade-plugin configuration
   
   ## Verifying this change
   
   Compile the module and ensure that the dependency-reduced-pom.xml is created 
and does not contain unresolved properties.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11194) missing Scala 2.12 build of HBase connector

2018-12-19 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11194:
---
Labels: artifact build hbase pull-request-available scala  (was: artifact 
build hbase scala)

> missing Scala 2.12 build of HBase connector 
> 
>
> Key: FLINK-11194
> URL: https://issues.apache.org/jira/browse/FLINK-11194
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Build System
>Affects Versions: 1.7.0
> Environment: Scala version 2.12.7
> Flink version 1.7.0
>Reporter: Zhenhao Li
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: artifact, build, hbase, pull-request-available, scala
>
> See the following SBT log.
> ```
> [error] (update) sbt.librarymanagement.ResolveException: unresolved 
> dependency: org.apache.flink#flink-hbase_2.12;1.7.0: Resolution failed 
> several times for dependency: org.apache.flink#flink-hbase_2.12;1.7.0 
> \{compile=[default(compile)]}:: 
> [error]     java.text.ParseException: inconsistent module descriptor file 
> found in 
> 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom':
>  bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; 
> [error]     java.text.ParseException: inconsistent module descriptor file 
> found in 
> 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom':
>  bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11';
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-19 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r242901812
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -120,6 +120,9 @@
/** This determines if incremental checkpointing is enabled. */
private final TernaryBoolean enableIncrementalCheckpointing;
 
+   /** Thread number used to download from DFS when restore, default 
value: 1. */
+   private int numberOfRestoringThreads;
 
 Review comment:
   We introduced a setter for it: `setNumberOfRestoringThreads`, not to explode 
the number of constructor args like for `enableIncrementalCheckpointing`. At 
the moment `RocksDBStateBackend` basically serves as a configuration object and 
factory for the keyed and operator backends.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11198) Access to MetricGroup in an AggregateFunction(Non Rich)

2018-12-19 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724959#comment-16724959
 ] 

Hequn Cheng commented on FLINK-11198:
-

[~chiggi_dev] Thanks for opening the issue. It would be nice if make metrics 
accessible from AggregateFunction. However, we should find a neat way to do 
this. 

> Access to MetricGroup in an AggregateFunction(Non Rich)
> ---
>
> Key: FLINK-11198
> URL: https://issues.apache.org/jira/browse/FLINK-11198
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.6.2
>Reporter: Chirag Dewan
>Assignee: Hequn Cheng
>Priority: Major
>
> The only way to add custom metrics from UDF is through RuntimeContext. And, 
> RuntimeContext is wired in every RichFunction implementation.
> However, for aggregate() in Windowed Stream, we cannot use the Rich version 
> of AggregateFunction. As I remotely understand, is done to avoid exposing the 
> state in the Aggregate UDF. 
> But can we have some minimal context which does not expose state but provide 
> metrics, classloader etc.in the UDF? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6782: [FLINK-9083][Cassandra Connector] Add async backpressure support to Cassandra Connector

2018-12-19 Thread GitBox
zentol commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r242899744
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseConfig.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.cassandra;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Configuration for {@link CassandraSinkBase}.
+ */
+public class CassandraSinkBaseConfig implements Serializable  {
+   //  Default Configurations 

+
+   /**
+* The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+*/
+   public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+   /**
+* The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+*/
+   public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+   /**
+* The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+*/
+   public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+   // - Configuration Fields 
-
+
+   /** Maximum number of concurrent requests allowed. */
+   private final int maxConcurrentRequests;
+
+   /** Timeout duration when acquiring a permit to execute. */
+   private final long maxConcurrentRequestsTimeout;
+
+   /** Timeout unit when acquiring a permit to execute. */
+   private final TimeUnit maxConcurrentRequestsTimeoutUnit;
+
+   public CassandraSinkBaseConfig(
+   int maxConcurrentRequests,
+   long maxConcurrentRequestsTimeout,
+   TimeUnit maxConcurrentRequestsTimeoutUnit) {
+   Preconditions.checkArgument(maxConcurrentRequests >= 0,
+   "Max concurrent requests is expected to be positive");
+   Preconditions.checkArgument(maxConcurrentRequestsTimeout >= 0,
+   "Max concurrent requests timeout is expected to be 
positive");
+   Preconditions.checkNotNull(maxConcurrentRequestsTimeoutUnit,
+   "Max concurrent requests timeout unit cannot be null");
+   this.maxConcurrentRequests = maxConcurrentRequests;
+   this.maxConcurrentRequestsTimeout = 
maxConcurrentRequestsTimeout;
+   this.maxConcurrentRequestsTimeoutUnit = 
maxConcurrentRequestsTimeoutUnit;
+   }
+
+   public int getMaxConcurrentRequests() {
+   return maxConcurrentRequests;
+   }
+
+   public long getMaxConcurrentRequestsTimeout() {
+   return maxConcurrentRequestsTimeout;
+   }
+
+   public TimeUnit getMaxConcurrentRequestsTimeoutUnit() {
+   return maxConcurrentRequestsTimeoutUnit;
+   }
+
+   @Override
+   public String toString() {
+   return "CassandraSinkBaseConfig{" +
+   "maxConcurrentRequests=" + maxConcurrentRequests +
+   ", maxConcurrentRequestsTimeout=" + 
maxConcurrentRequestsTimeout +
+   ", maxConcurrentRequestsTimeoutUnit=" + 
maxConcurrentRequestsTimeoutUnit +
+   '}';
+   }
+
+   public static Builder newBuilder() {
 
 Review comment:
   This comment hasn't been addressed yet, the `Builder` constructor is still 
public.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-19 Thread GitBox
zentol commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r242898360
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ##
 @@ -120,6 +120,9 @@
/** This determines if incremental checkpointing is enabled. */
private final TernaryBoolean enableIncrementalCheckpointing;
 
+   /** Thread number used to download from DFS when restore, default 
value: 1. */
+   private int numberOfRestoringThreads;
 
 Review comment:
   i think this can also be final


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11198) Access to MetricGroup in an AggregateFunction(Non Rich)

2018-12-19 Thread Hequn Cheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng reassigned FLINK-11198:
---

Assignee: Hequn Cheng

> Access to MetricGroup in an AggregateFunction(Non Rich)
> ---
>
> Key: FLINK-11198
> URL: https://issues.apache.org/jira/browse/FLINK-11198
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.6.2
>Reporter: Chirag Dewan
>Assignee: Hequn Cheng
>Priority: Major
>
> The only way to add custom metrics from UDF is through RuntimeContext. And, 
> RuntimeContext is wired in every RichFunction implementation.
> However, for aggregate() in Windowed Stream, we cannot use the Rich version 
> of AggregateFunction. As I remotely understand, is done to avoid exposing the 
> state in the Aggregate UDF. 
> But can we have some minimal context which does not expose state but provide 
> metrics, classloader etc.in the UDF? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-19 Thread GitBox
dawidwys commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r242894460
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   /**
+* Set savepoint restore settings that will be used when executing the 
job.
+* @param savepointRestoreSettings savepoint restore settings
+*/
+   public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
+   this.savepointRestoreSettings = savepointRestoreSettings;
 
 Review comment:
   Personally I do prefer the method. With the constructor your original 
concern still stands: 
   >  Savepoint restore should be configured per-execution basis and not for 
the lifetime of the RemoteStreamEnvironment


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed 
randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597
 
 
   Actually the only scenario where the randomness in the tmp files can help is 
in the case where we have a "split-brain" scenario. 
   
   In this scenario, some TaskManagers (TMs) are considered down but they are 
not. So the job restarts,  but the old TMs keep on processing data. In this 
case, the old TMs will not be able to commit any data, as they have **no** 
JobManager (JM) so no checkpoints, but they will be writing in temporary files. 
Now the new TMs, will also be writing in tmp files (and then committing them). 
If there is no randomness, then old and new TMs may be writing in the same temp 
files (if the FS allows it). 
   
   That said, I am not so sure if this scenario can happen, _i.e._ if TMs can 
keep running without a JM and I am not sure if handling it at the sink is 
enough. @StephanEwen and @aljoscha ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u commented on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u commented on issue #7313: [FLINK-6][fs-connector] Removed randomness 
from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597
 
 
   Actually the only scenario where the randomness in the tmp files can help is 
in the case where we have a "split-brain" scenario. 
   
   In this scenario, some TaskManagers (TMs) are considered down but they are 
not. So the job restarts,  but the old TMs keep on processing data. In this 
case, the old TMs will not be able to commit any data, as they have **no** 
JobManager (JM) so no checkpoints, but they will be writing in temporary files. 
Now the new TMs, will also be writing in tmp files (and then committing them). 
If there is no randomness, then old and new TMs may be writing in the same temp 
files (if the FS allows it). 
   
   That said, I am not so sure if this scenario can happen, _i.e._ if TMs can 
keep running without a JM.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed 
randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597
 
 
   Actually the only scenario where the randomness in the tmp files can help is 
in the case where we have a "split-brain" scenario. 
   
   In this scenario, some TaskManagers (TMs) are considered down but they are 
not. So the job restarts,  but the old TMs keep on processing data. In this 
case, the old TMs will not be able to commit any data, as they have **no** 
JobManager (JM) so no checkpoints, but they will be writing in temporary files. 
Now the new TMs, will also be writing in tmp files (and then committing them). 
If there is no randomness, then old and new TMs may be writing in the same temp 
files (if the FS allows it). 
   
   That said, I am not so sure if this scenario can happen, _i.e._ if TMs can 
keep running without a JM. @StephanEwen ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed 
randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448570597
 
 
   Actually the only scenario where the randomness in the tmp files can help is 
in the case where we have a "split-brain" scenario. 
   
   In this scenario, some TaskManagers (TMs) are considered down but they are 
not. So the job restarts,  but the old TMs keep on processing data. In this 
case, the old TMs will not be able to commit any data, as they have **no** 
JobManager (JM) so no checkpoints, but they will be writing in temporary files. 
Now the new TMs, will also be writing in tmp files (and then committing them). 
If there is no randomness, then old and new TMs may be writing in the same temp 
files (if the FS allows it). 
   
   That said, I am not so sure if this scenario can happen, _i.e._ if TMs can 
keep running without a JM and I am not sure if handling it at the sink is 
enough. @StephanEwen ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] kl0u edited a comment on issue #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-19 Thread GitBox
kl0u edited a comment on issue #7313: [FLINK-6][fs-connector] Removed 
randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313#issuecomment-448519244
 
 
   Thanks for the review @igalshilman ! 
   
   I am not sure for the reason for introducing the randomness in the first 
place. I would assume that this was mainly out of extra cautiousness to not 
overwrite already existing data. Although in the case of `HDFS` and `local` FS 
this is guaranteed (for committed data) by the naming convention used. Maybe 
@StephanEwen, who introduced this, has something more to add.
   
   Apart from that, I integrated your comments. Let me know if it looks ok now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue

2018-12-19 Thread eugen yushin (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724931#comment-16724931
 ] 

eugen yushin commented on FLINK-11201:
--

it works like a charm in 1.6, so I think something has been changed in 1.7 
version regarding dependency management for flink-test-utils.
btw, class from an error 
`org.apache.flink.runtime.testutils.MiniClusterResource` comes from test 
sources of flink-runtime, not the source code.

> flink-test-utils dependency issue
> -
>
> Key: FLINK-11201
> URL: https://issues.apache.org/jira/browse/FLINK-11201
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: eugen yushin
>Priority: Major
>
> Starting with Flink 1.7, there's lack of 
> `runtime.testutils.MiniClusterResource` class in `flink-test-utils` 
> distribution.
> Steps to reproduce (Scala code)
> build.sbt
> {code}
> name := "flink-17-test-issue"
> organization := "x.y.z"
> scalaVersion := "2.11.12"
> val flinkVersion = "1.7.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.scalatest" %% "scalatest" % "3.0.5" % Test,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
> //  ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier 
> Artifact.TestsClassifier
> )
> {code}
> test class:
> {code}
> class SimpleTest extends AbstractTestBase with FlatSpecLike {
>   implicit val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   env.setParallelism(1)
>   env.setRestartStrategy(RestartStrategies.noRestart())
>   "SimpleTest" should "work" in {
> val inputDs = env.fromElements(1,2,3)
> inputDs.print()
> env.execute()
>   }
> }
> {code}
> Results in:
> {code}
> A needed class was not found. This could be due to an error in your runpath. 
> Missing class: org/apache/flink/runtime/testutils/MiniClusterResource
> java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/testutils/MiniClusterResource
> ...
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.testutils.MiniClusterResource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 31 more
> {code}
> This can be fixed by adding flink-runtime distribution with test classifier 
> into dependencies list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11180) ProcessFailureCancelingITCase#testCancelingOnProcessFailure

2018-12-19 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11180?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-11180.

   Resolution: Fixed
Fix Version/s: 1.8.0
   1.7.2

master: b02121c4673f80e606c7211d73de29c7295df7ae
1.7: 4113c6b57a9f9daefc00969b31fd202b02c4ac3f

> ProcessFailureCancelingITCase#testCancelingOnProcessFailure
> ---
>
> Key: FLINK-11180
> URL: https://issues.apache.org/jira/browse/FLINK-11180
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> tag: release-1.7.1-rc2
> org.apache.flink.util.FlinkException: Could not create the 
> DispatcherResourceManagerComponent.
> at 
> org.apache.flink.runtime.entrypoint.component.AbstractDispatcherResourceManagerComponentFactory.create(AbstractDispatcherResourceManagerComponentFactory.java:242)
>  at 
> org.apache.flink.test.recovery.ProcessFailureCancelingITCase.testCancelingOnProcessFailure(ProcessFailureCancelingITCase.java:148)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>  at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>  at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>  at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>  at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>  at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>  at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>  at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>  at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>  at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>  at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>  at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>  at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>  at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>  at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>  at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> Caused by: java.net.BindException: Address already in use
>  at sun.nio.ch.Net.bind0(Native Method)
>  at sun.nio.ch.Net.bind(Net.java:433)
>  at sun.nio.ch.Net.bind(Net.java:425)
>  at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1358)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:1019)
>  at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
>  at 
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:366)
>  at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>  at 
> 

[GitHub] zentol closed pull request #7327: [FLINK-11180][tests] Fix ProcessFailureCancelingITCase.testCancelingOnProcessFailure error by specifying a free port

2018-12-19 Thread GitBox
zentol closed pull request #7327: [FLINK-11180][tests] Fix 
ProcessFailureCancelingITCase.testCancelingOnProcessFailure error by specifying 
a free port
URL: https://github.com/apache/flink/pull/7327
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index ed987d677d8..96e289dbf5c 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -32,6 +32,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
@@ -116,6 +117,7 @@ public void testCancelingOnProcessFailure() throws 
Exception {
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+   config.setInteger(RestOptions.PORT, 0);
 
final RpcService rpcService = 
AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
final int jobManagerPort = rpcService.getPort();


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-19 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r242884502
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   /**
+* Set savepoint restore settings that will be used when executing the 
job.
+* @param savepointRestoreSettings savepoint restore settings
+*/
+   public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
+   this.savepointRestoreSettings = savepointRestoreSettings;
 
 Review comment:
   Unless you prefer the method instead of the constructor, then this should be 
ready to be merged :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-19 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r242884359
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -169,33 +173,66 @@ public RemoteStreamEnvironment(String host, int port, 
Configuration clientConfig
}
}
 
-   @Override
-   public JobExecutionResult execute(String jobName) throws 
ProgramInvocationException {
-   StreamGraph streamGraph = getStreamGraph();
-   streamGraph.setJobName(jobName);
-   transformations.clear();
-   return executeRemotely(streamGraph, jarFiles);
+   /**
+* Set savepoint restore settings that will be used when executing the 
job.
+* @param savepointRestoreSettings savepoint restore settings
+*/
+   public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
+   this.savepointRestoreSettings = savepointRestoreSettings;
 
 Review comment:
   To be fair the code above has been changed, it's not a setter anymore but 
set in the constructor. You summed up quite nicely what is actually the state 
of this PR now. Nothing is broken in terms of backwards-compatibility but the 
structure changed a bit which makes it hard to review. The only difference is 
that an additional constructor has been added for the savepoint settings 
instead of this method you proposed:
   
   ```
   public JobExecutionResult execute(String jobName, SavepointRestoreSettings 
savepointRestoreSettings) throws ProgramInvocationException {
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys closed pull request #7321: [hotfix] Fix typo in AbstractStreamOperator

2018-12-19 Thread GitBox
dawidwys closed pull request #7321: [hotfix] Fix typo in AbstractStreamOperator
URL: https://github.com/apache/flink/pull/7321
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index d9a195cb235..fa5231ec1ed 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -639,7 +639,7 @@ public Object getCurrentKey() {
if (keyedStateBackend != null) {
return keyedStateBackend.getCurrentKey();
} else {
-   throw new UnsupportedOperationException("Key can only 
be retrieven on KeyedStream.");
+   throw new UnsupportedOperationException("Key can only 
be retrieved on KeyedStream.");
}
}
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10672) Task stuck while writing output to flink

2018-12-19 Thread Maximilian Michels (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724901#comment-16724901
 ] 

Maximilian Michels commented on FLINK-10672:


No, [~thw] just set the Fix Version to empty because this is not fixed in 1.5.6.

> Task stuck while writing output to flink
> 
>
> Key: FLINK-10672
> URL: https://issues.apache.org/jira/browse/FLINK-10672
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.4
> Environment: OS: Debuan rodente 4.17
> Flink version: 1.5.4
> ||Key||Value||
> |jobmanager.heap.mb|1024|
> |jobmanager.rpc.address|localhost|
> |jobmanager.rpc.port|6123|
> |metrics.reporter.jmx.class|org.apache.flink.metrics.jmx.JMXReporter|
> |metrics.reporter.jmx.port|9250-9260|
> |metrics.reporters|jmx|
> |parallelism.default|1|
> |rest.port|8081|
> |taskmanager.heap.mb|1024|
> |taskmanager.numberOfTaskSlots|1|
> |web.tmpdir|/tmp/flink-web-bdb73d6c-5b9e-47b5-9ebf-eed0a7c82c26|
>  
> h1. Overview
> ||Data Port||All Slots||Free Slots||CPU Cores||Physical Memory||JVM Heap 
> Size||Flink Managed Memory||
> |43501|1|0|12|62.9 GB|922 MB|642 MB|
> h1. Memory
> h2. JVM (Heap/Non-Heap)
> ||Type||Committed||Used||Maximum||
> |Heap|922 MB|575 MB|922 MB|
> |Non-Heap|68.8 MB|64.3 MB|-1 B|
> |Total|991 MB|639 MB|922 MB|
> h2. Outside JVM
> ||Type||Count||Used||Capacity||
> |Direct|3,292|105 MB|105 MB|
> |Mapped|0|0 B|0 B|
> h1. Network
> h2. Memory Segments
> ||Type||Count||
> |Available|3,194|
> |Total|3,278|
> h1. Garbage Collection
> ||Collector||Count||Time||
> |G1_Young_Generation|13|336|
> |G1_Old_Generation|1|21|
>Reporter: Ankur Goenka
>Priority: Major
>  Labels: beam
> Attachments: 1uruvakHxBu.png, 3aDKQ24WvKk.png, Po89UGDn58V.png, 
> jmx_dump.json, jmx_dump_detailed.json, jstack_129827.log, jstack_163822.log, 
> jstack_66985.log
>
>
> I am running a fairly complex pipleline with 200+ task.
> The pipeline works fine with small data (order of 10kb input) but gets stuck 
> with a slightly larger data (300kb input).
>  
> The task gets stuck while writing the output toFlink, more specifically it 
> gets stuck while requesting memory segment in local buffer pool. The Task 
> manager UI shows that it has enough memory and memory segments to work with.
> The relevant stack trace is 
> {quote}"grpc-default-executor-0" #138 daemon prio=5 os_prio=0 
> tid=0x7fedb0163800 nid=0x30b7f in Object.wait() [0x7fedb4f9]
>  java.lang.Thread.State: TIMED_WAITING (on object monitor)
>  at (C/C++) 0x7fef201c7dae (Unknown Source)
>  at (C/C++) 0x7fef1f2aea07 (Unknown Source)
>  at (C/C++) 0x7fef1f241cd3 (Unknown Source)
>  at java.lang.Object.wait(Native Method)
>  - waiting on <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:247)
>  - locked <0xf6d56450> (a java.util.ArrayDeque)
>  at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
>  at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
>  at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:42)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStagePruningFunction.flatMap(FlinkExecutableStagePruningFunction.java:26)
>  at 
> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
>  at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>  at 
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction$MyDataReceiver.accept(FlinkExecutableStageFunction.java:230)
>  - locked <0xf6a60bd0> (a java.lang.Object)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:81)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:32)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:139)
>  at 
> org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:125)
>  at 
> 

[GitHub] dawidwys closed pull request #7335: Release 1.7

2018-12-19 Thread GitBox
dawidwys closed pull request #7335: Release 1.7
URL: https://github.com/apache/flink/pull/7335
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11186) Event-time balancing for multiple Kafka partitions

2018-12-19 Thread Tom Schamberger (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724899#comment-16724899
 ] 

Tom Schamberger commented on FLINK-11186:
-

[~Paul Lin] The proposed source interface in FLIP-27 is an important 
precondition to be able to align the event time of different partitions within 
individual source subtasks. But if the parallelism of the source operator is 
greater than one, different source subtasks have to somehow communicate, how 
far each partition should be allowed to progress. This could be done by 
back-pressure produced by succeeding operators

> Event-time balancing for multiple Kafka partitions
> --
>
> Key: FLINK-11186
> URL: https://issues.apache.org/jira/browse/FLINK-11186
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API, Kafka Connector
>Reporter: Tom Schamberger
>Priority: Major
>
> Currently, it is not possible with Flink to back-pressure individual Kafka 
> partitions, which are faster in terms of event-time. This leads to 
> unnecessary memory consumption and can lead to deadlocks in the case of 
> back-pressure.
> When multiple Kafka topics are consumed, succeeding event-time window 
> operators have to wait until the last Kafka partition has produced a 
> sufficient watermark to be triggered. If individual Kafka partitions differ 
> in read performance or the event-time of messages within partitions is not 
> monotonically distributed, this can lead to a situation, where 'fast' 
> partitions (event-time makes fast progress) outperform slower partitions 
> until back-pressuring prevents all partitions from being further consumed. 
> This leads to a deadlock of the application.
> I suggest, that windows should be able to back-pressure individual 
> partitions, which progress faster in terms of event-time, so that slow 
> partitions can keep up.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11201) flink-test-utils dependency issue

2018-12-19 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724897#comment-16724897
 ] 

Chesnay Schepler commented on FLINK-11201:
--

flink-test-utils has a dependency on flink-runtime, including the test-jar. I 
don't understand why this isn't pulled in; is this an sbt thing?

> flink-test-utils dependency issue
> -
>
> Key: FLINK-11201
> URL: https://issues.apache.org/jira/browse/FLINK-11201
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: eugen yushin
>Priority: Major
>
> Starting with Flink 1.7, there's lack of 
> `runtime.testutils.MiniClusterResource` class in `flink-test-utils` 
> distribution.
> Steps to reproduce (Scala code)
> build.sbt
> {code}
> name := "flink-17-test-issue"
> organization := "x.y.z"
> scalaVersion := "2.11.12"
> val flinkVersion = "1.7.0"
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.scalatest" %% "scalatest" % "3.0.5" % Test,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test
> //  ,"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier 
> Artifact.TestsClassifier
> )
> {code}
> test class:
> {code}
> class SimpleTest extends AbstractTestBase with FlatSpecLike {
>   implicit val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>   env.setParallelism(1)
>   env.setRestartStrategy(RestartStrategies.noRestart())
>   "SimpleTest" should "work" in {
> val inputDs = env.fromElements(1,2,3)
> inputDs.print()
> env.execute()
>   }
> }
> {code}
> Results in:
> {code}
> A needed class was not found. This could be due to an error in your runpath. 
> Missing class: org/apache/flink/runtime/testutils/MiniClusterResource
> java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/testutils/MiniClusterResource
> ...
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.testutils.MiniClusterResource
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   ... 31 more
> {code}
> This can be fixed by adding flink-runtime distribution with test classifier 
> into dependencies list.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11194) missing Scala 2.12 build of HBase connector

2018-12-19 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16724896#comment-16724896
 ] 

Chesnay Schepler commented on FLINK-11194:
--

hmmm..so t's not that flink-hbase was not released for scala 2.12, the 
issue is that the published pom does not explicitly refer to scala 2.12, but 
the scala version property instead:
{code}
flink-hbase_${scala.binary.version}flink-hbasejar
{code}

The reason for this is simple; the shade-plugin is completely disabled in this 
module.

Let's see what happens if we just enable it again...

> missing Scala 2.12 build of HBase connector 
> 
>
> Key: FLINK-11194
> URL: https://issues.apache.org/jira/browse/FLINK-11194
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Build System
>Affects Versions: 1.7.0
> Environment: Scala version 2.12.7
> Flink version 1.7.0
>Reporter: Zhenhao Li
>Assignee: vinoyang
>Priority: Major
>  Labels: artifact, build, hbase, scala
>
> See the following SBT log.
> ```
> [error] (update) sbt.librarymanagement.ResolveException: unresolved 
> dependency: org.apache.flink#flink-hbase_2.12;1.7.0: Resolution failed 
> several times for dependency: org.apache.flink#flink-hbase_2.12;1.7.0 
> \{compile=[default(compile)]}:: 
> [error]     java.text.ParseException: inconsistent module descriptor file 
> found in 
> 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom':
>  bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; 
> [error]     java.text.ParseException: inconsistent module descriptor file 
> found in 
> 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom':
>  bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11';
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11198) Access to MetricGroup in an AggregateFunction(Non Rich)

2018-12-19 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11198:
-
Fix Version/s: (was: 1.6.3)

> Access to MetricGroup in an AggregateFunction(Non Rich)
> ---
>
> Key: FLINK-11198
> URL: https://issues.apache.org/jira/browse/FLINK-11198
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.6.2
>Reporter: Chirag Dewan
>Priority: Major
>
> The only way to add custom metrics from UDF is through RuntimeContext. And, 
> RuntimeContext is wired in every RichFunction implementation.
> However, for aggregate() in Windowed Stream, we cannot use the Rich version 
> of AggregateFunction. As I remotely understand, is done to avoid exposing the 
> state in the Aggregate UDF. 
> But can we have some minimal context which does not expose state but provide 
> metrics, classloader etc.in the UDF? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11194) missing Scala 2.12 build of HBase connector

2018-12-19 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reassigned FLINK-11194:


Assignee: Chesnay Schepler  (was: vinoyang)

> missing Scala 2.12 build of HBase connector 
> 
>
> Key: FLINK-11194
> URL: https://issues.apache.org/jira/browse/FLINK-11194
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats, Build System
>Affects Versions: 1.7.0
> Environment: Scala version 2.12.7
> Flink version 1.7.0
>Reporter: Zhenhao Li
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: artifact, build, hbase, scala
>
> See the following SBT log.
> ```
> [error] (update) sbt.librarymanagement.ResolveException: unresolved 
> dependency: org.apache.flink#flink-hbase_2.12;1.7.0: Resolution failed 
> several times for dependency: org.apache.flink#flink-hbase_2.12;1.7.0 
> \{compile=[default(compile)]}:: 
> [error]     java.text.ParseException: inconsistent module descriptor file 
> found in 
> 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom':
>  bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11'; 
> [error]     java.text.ParseException: inconsistent module descriptor file 
> found in 
> 'https://repo1.maven.org/maven2/org/apache/flink/flink-hbase_2.12/1.7.0/flink-hbase_2.12-1.7.0.pom':
>  bad module name: expected='flink-hbase_2.12' found='flink-hbase_2.11';
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >