[GitHub] bowenli86 commented on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-11 Thread GitBox
bowenli86 commented on issue #7643: [FLINK-11474][table] Add ReadableCatalog, 
ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#issuecomment-462653964
 
 
   Just took another look and found this PR needs to adapt to the new 
flink-table structure given that flink-table has been moved out of 
flink-libraries and broken into several new modules. Specifically, all the new 
classes need to be in flink/flink-table/flink-table-planner, where the existing 
ExternalCatalog and related code reside


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] bowenli86 removed a comment on issue #7643: [FLINK-11474][table] Add ReadableCatalog, ReadableWritableCatalog, and other …

2019-02-11 Thread GitBox
bowenli86 removed a comment on issue #7643: [FLINK-11474][table] Add 
ReadableCatalog, ReadableWritableCatalog, and other …
URL: https://github.com/apache/flink/pull/7643#issuecomment-461575592
 
 
   LGTM  +1 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-11 Thread GitBox
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r255822550
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+import java.util.Properties;
+
+/**
+ * A RateLimiterFactory that configures and creates a rate limiter.
+ */
+public class RateLimiterFactory {
+
+   /** Flag that indicates if ratelimiting is enabled. */
+   private static final String RATELIMITING_FLAG = 
"consumer.ratelimiting.enabled";
 
 Review comment:
   Personally, I prefer independent configurations if possible. I am wondering 
if the following configuration names would be clear enough:
   ```
   CONSUMER_MAX_BYTES_PER_SECOND_CONFIG="consumer.max.bytes.per.second"
   CONSUMER_MAX_RECORDS_PER_SECOND_CONFIG="consumer.max.records.per.second"
   ```
   In the future we can add configurations for the producers.
   
   The default value of the above configurations could be either be set to 
Long.MAX_VALUE, or -1 to indicate not throttling.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-11 Thread GitBox
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r255776176
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
 ##
 @@ -69,6 +70,10 @@
 
private static final long serialVersionUID = 2324564345203409112L;
 
+   /** Configuration to set consumer prefix for ratelimiting. **/
+   private static final String CONSUMER_PREFIX = "kafka";
 
 Review comment:
   Do we need this prefix? Can all the source and sink share the same 
configuration name? If we do need the prefix, can we put this in somewhere like 
`FlinkKafkaConsumerBase`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-11 Thread GitBox
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r255816157
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/config/RateLimiterFactory.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.config;
+
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
+
+import java.util.Properties;
+
+/**
+ * A RateLimiterFactory that configures and creates a rate limiter.
+ */
+public class RateLimiterFactory {
 
 Review comment:
   Can we move this class to a separate package so it can be shared by all the 
other connectors?
   For example, a `flink-connector-throttling` module in `flink-connectors`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-11 Thread GitBox
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r255815625
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ##
 @@ -482,6 +502,49 @@ void 
reassignPartitions(List> newPartit
return new KafkaConsumer<>(kafkaProperties);
}
 
+   @VisibleForTesting
+   RateLimiter getRateLimiter() {
+   return rateLimiter;
+   }
+
+   // 
---
+   // Rate limiting methods
+   // 
---
+   /**
+*
+* @param records List of ConsumerRecords.
+* @return Total batch size in bytes, including key and value.
+*/
+   private int getRecordBatchSize(ConsumerRecords records) 
{
 
 Review comment:
   It is a little unfortunate that we have to make an additional iteration over 
the records to get the sizes. Another option is putting the throttling logic in 
the `AbstractFetcher`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-11 Thread GitBox
becketqin commented on a change in pull request #7679: [FLINK-11501][Kafka 
Connector] Add ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#discussion_r255813728
 
 

 ##
 File path: 
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java
 ##
 @@ -22,12 +22,16 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.connectors.kafka.config.RateLimiterFactory;
 import 
org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
 import 
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
 
+import 
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.RateLimiter;
 
 Review comment:
   Do we want to directly import class from the shaded jar?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on issue #7549: [FLINK-11403][network] Remove ResultPartitionConsumableNotifier from ResultPartition

2019-02-11 Thread GitBox
zhijiangW commented on issue #7549: [FLINK-11403][network] Remove 
ResultPartitionConsumableNotifier from ResultPartition
URL: https://github.com/apache/flink/pull/7549#issuecomment-462638845
 
 
   @azagrebin , thanks for your reviews!
   
   I think your suggestion makes sense and is worth trying. We wrap the 
`ResultPartition` created from `ShuffleService` and other fields related to 
consumption notification in a separate new class. Then the `RecordWriter` and 
`Task` only see this new wrapped class avoid of spreading tiny ones anywhere.
   
   I would have a try and re-submit the codes when ready.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on issue #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
zhijiangW commented on issue #7631: [FLINK-11391][shuffle] Introduce 
PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#issuecomment-462637123
 
 
   @azagrebin , thanks for your reviews!  :)
   I was also confused a bit by the points you mentioned during implementation. 
 I left some thoughts if I understood your suggestions correctly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r255819481
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ShuffleDeploymentDescriptor.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.io.network.ConnectionID;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deployment descriptor for shuffle specific information.
+ */
+public class ShuffleDeploymentDescriptor implements Serializable {
 
 Review comment:
   It might need an explicit method of `getConnectionId` in the interface if to 
do? Because the ICDD might either see `UnknownShuffleDeploymentDescriptor` or 
`KnownShuffleDeploymentDescriptor` and it should provide the way of getting 
`ConnectionID` if `LocationType==Remote`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r255818212
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ##
 @@ -52,30 +52,38 @@
/** The ID of the partition the input channel is going to consume. */
private final ResultPartitionID consumedPartitionId;
 
-   /** The location of the partition the input channel is going to 
consume. */
-   private final ResultPartitionLocation consumedPartitionLocation;
+   /** The location type of the partition the input channel is going to 
consume. */
+   private final LocationType locationType;
+
+   /** The connection to use to request the remote partition. */
+   private final Optional connectionId;
 
 Review comment:
   I also considered using `ShuffleDeploymentDescriptor` here to replace 
`ConnectionID` before. But there are two concerns in implementation:
   
   1.  In eager schedule mode when receiving all required slots, we might not 
assume the deployment sequence must be strict with topology sequence. That 
means the consumer execution deployment might be earlier than the producer 
execution. So in the process of `InputChannelDeploymentDescriptor#fromEdges`, 
we might not get cached SDD directly from producer execution. But we can 
generate `ConnectionID` based on other infos. Otherwise we must confirm the 
deployment sequence is from producer to consumer or generate producer's SDD 
during deploying consumer in `InputChannelDeploymentDescriptor#fromEdges`.
   
   2. I thought of introducing `UnknownShuffleDeploymentDescriptor` before, but 
from semantic aspect it is a bit redundant with `LocationType.Unknown`. In 
addition, it seems no specific usages like ` instanceof 
UnknownShuffleDeploymentDescriptor` in other processes.  The SDD should be 
generated by `ShuffleMaster` by design, but the special 
`UnknownShuffleDeploymentDescriptor` is generated only in the case of 
`LocationType.Unknown` which is not via `ShuffleMaster`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11558) Translate "Ecosystem" page into Chinese

2019-02-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11558:
---
Labels: pull-request-available  (was: )

> Translate "Ecosystem" page into Chinese
> ---
>
> Key: FLINK-11558
> URL: https://issues.apache.org/jira/browse/FLINK-11558
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Kaibo Zhou
>Priority: Major
>  Labels: pull-request-available
>
> Translate "Ecosystem" page into Chinese.
> The markdown file is located in: flink-web/ecosystem.zh.md
> The url link is: https://flink.apache.org/zh/ecosystem.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r255814019
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -813,24 +815,27 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
boolean lazyScheduling = 
getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
for (IntermediateResultPartition partition : 
resultPartitions.values()) {
-
List> consumers = 
partition.getConsumers();
-
+   int maxParallelism;
if (consumers.isEmpty()) {
//TODO this case only exists for test, 
currently there has to be exactly one consumer in real jobs!
-   
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
-   partition,
-   
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
-   lazyScheduling));
+   maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
} else {
Preconditions.checkState(1 == consumers.size(),
-   "Only one consumer supported in 
the current implementation! Found: " + consumers.size());
+   "Only one consumer supported in the 
current implementation! Found: " + consumers.size());
 
List consumer = consumers.get(0);
ExecutionJobVertex vertex = 
consumer.get(0).getTarget().getJobVertex();
-   int maxParallelism = vertex.getMaxParallelism();
-   
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
maxParallelism, lazyScheduling));
+   maxParallelism = vertex.getMaxParallelism();
}
+
+   PartitionShuffleDescriptor psd = 
PartitionShuffleDescriptor.from(targetSlot, executionId, partition, 
maxParallelism);
+
+   
producedPartitions.add(ResultPartitionDeploymentDescriptor.fromShuffleDescriptor(psd));
+   
getCurrentExecutionAttempt().cachePartitionShuffleDescriptor(partition.getIntermediateResult().getId(),
 psd);
 
 Review comment:
   From functional aspect, caching the `TaskDeploymentDescriptor` might also 
make sense. But I have other concerns:
   1. The structure of TDD is complicated and would take more memory if caching 
completely, such as unnecessary fields `serializedJobInformation`, 
`serializedTaskInformation`,etc.
   
   2. We might need adjust the current collection structure of 
`producedPartitions`, `inputGates` in TDD to map structure in order to find 
required PSD, SDD directly for other usages.
   
   3. If replacing the current three descriptors caches, we might not need the 
class of `PartialInputChannelDeploymentDescriptor` any more if I understand 
correctly. But I wonder there exists such scenarios that during deploying 
consumer execution, only some input channel descriptors are unknown. During 
sending partition infos we only want to send these unknown infos, so how can we 
distinguish them from all the cached producer's TDD?  In other words, the 
current cached `partialInputChannelDeploymentDescriptors` might be only a sub 
collection of all cached TDDs on producer side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sunjincheng121 commented on issue #5923: [FLINK-9253][network] make the maximum floating buffers count channel-type independent

2019-02-11 Thread GitBox
sunjincheng121 commented on issue #5923: [FLINK-9253][network] make the maximum 
floating buffers count channel-type independent
URL: https://github.com/apache/flink/pull/5923#issuecomment-462629185
 
 
   Hi, @NicoK are you still following this PR? 
   Recently, @rmetzger and I are planning the release of Flink 1.6.4. Do you 
want put this PR into release 1.6.4?
   Best,
   Jincheng


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
zhijiangW commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r255814019
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -813,24 +815,27 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
boolean lazyScheduling = 
getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
for (IntermediateResultPartition partition : 
resultPartitions.values()) {
-
List> consumers = 
partition.getConsumers();
-
+   int maxParallelism;
if (consumers.isEmpty()) {
//TODO this case only exists for test, 
currently there has to be exactly one consumer in real jobs!
-   
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
-   partition,
-   
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
-   lazyScheduling));
+   maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
} else {
Preconditions.checkState(1 == consumers.size(),
-   "Only one consumer supported in 
the current implementation! Found: " + consumers.size());
+   "Only one consumer supported in the 
current implementation! Found: " + consumers.size());
 
List consumer = consumers.get(0);
ExecutionJobVertex vertex = 
consumer.get(0).getTarget().getJobVertex();
-   int maxParallelism = vertex.getMaxParallelism();
-   
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
maxParallelism, lazyScheduling));
+   maxParallelism = vertex.getMaxParallelism();
}
+
+   PartitionShuffleDescriptor psd = 
PartitionShuffleDescriptor.from(targetSlot, executionId, partition, 
maxParallelism);
+
+   
producedPartitions.add(ResultPartitionDeploymentDescriptor.fromShuffleDescriptor(psd));
+   
getCurrentExecutionAttempt().cachePartitionShuffleDescriptor(partition.getIntermediateResult().getId(),
 psd);
 
 Review comment:
   From functional aspect, caching the `TaskDeploymentDescriptor` might also 
make sense. But I have other concerns:
   1. The structure of TDD is complicated and would take more memory if caching 
completely, such as unnecessary fields `serializedJobInformation`, 
`serializedTaskInformation`,etc.
   
   2. We might need adjust the current collection structure of 
`producedPartitions`, `inputGates` in TDD to map structure in order to find 
required PSD, SDD directly for other usages.
   
   3. If replacing the current three descriptors caches, we might not need the 
class of `PartialInputChannelDeploymentDescriptor` any more if I understand 
correctly. But I wonder there exists such scenarios that during deploying 
consumer execution, only some input channel descriptors are unknown. During 
sending partition infos we only want to send these unknown infos when consumer 
deployment, so how can we distinguish them from all the cached producer's TDD?  
In other words, the current cached `partialInputChannelDeploymentDescriptors` 
might be only a sub collection of cached TDDs on producer side.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11581) Add Chinese document contribution guideline

2019-02-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11581:
---
Labels: pull-request-available  (was: )

> Add Chinese document contribution guideline
> ---
>
> Key: FLINK-11581
> URL: https://issues.apache.org/jira/browse/FLINK-11581
> Project: Flink
>  Issue Type: Sub-task
>  Components: Project Website
>Reporter: Jark Wu
>Assignee: Forward Xu
>Priority: Major
>  Labels: pull-request-available
>
> Add a guideline to introduce how to contribute Chinese docs. For example, how 
> it works, how to add a new Chinese page.
> Maybe we can add a section to 
> https://flink.apache.org/contribute-documentation.html .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11581) Add Chinese document contribution guideline

2019-02-11 Thread Forward Xu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11581?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Forward Xu reassigned FLINK-11581:
--

Assignee: Forward Xu

> Add Chinese document contribution guideline
> ---
>
> Key: FLINK-11581
> URL: https://issues.apache.org/jira/browse/FLINK-11581
> Project: Flink
>  Issue Type: Sub-task
>  Components: Project Website
>Reporter: Jark Wu
>Assignee: Forward Xu
>Priority: Major
>
> Add a guideline to introduce how to contribute Chinese docs. For example, how 
> it works, how to add a new Chinese page.
> Maybe we can add a section to 
> https://flink.apache.org/contribute-documentation.html .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11555) Translate "Contributing Code" page into Chinese

2019-02-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11555:
---
Labels: pull-request-available  (was: )

> Translate "Contributing Code" page into Chinese
> ---
>
> Key: FLINK-11555
> URL: https://issues.apache.org/jira/browse/FLINK-11555
> Project: Flink
>  Issue Type: Sub-task
>  Components: chinese-translation, Project Website
>Reporter: Jark Wu
>Assignee: Forward Xu
>Priority: Major
>  Labels: pull-request-available
>
> Translate "Contributing Code" page into Chinese.
> The markdown file is located in: flink-web/contribute-code.zh.md
> The url link is: https://flink.apache.org/zh/contribute-code.html
> Please adjust the links in the page to Chinese pages when translating. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11581) Add Chinese document contribution guideline

2019-02-11 Thread Jark Wu (JIRA)
Jark Wu created FLINK-11581:
---

 Summary: Add Chinese document contribution guideline
 Key: FLINK-11581
 URL: https://issues.apache.org/jira/browse/FLINK-11581
 Project: Flink
  Issue Type: Sub-task
  Components: Project Website
Reporter: Jark Wu


Add a guideline to introduce how to contribute Chinese docs. For example, how 
it works, how to add a new Chinese page.

Maybe we can add a section to 
https://flink.apache.org/contribute-documentation.html .



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] liyafan82 opened a new pull request #7680: [FLINK-11421][Table API & SQL] Providing more compilation options for code-generated o…

2019-02-11 Thread GitBox
liyafan82 opened a new pull request #7680: [FLINK-11421][Table API & SQL] 
Providing more compilation options for code-generated o…
URL: https://github.com/apache/flink/pull/7680
 
 
   …perators (changes for stream jobs)
   
   ## What is the purpose of the change
   
   This is to support JIRA [FLINK-11421] Providing more compilation options for 
code-generated operators
   
   
   ## Brief change log
   
   *The main changes include*
 - *Provide the JCA compiler to compile generated code by Java Compiler 
API.*
 - *Support specifying the compilation option from the configuration file 
(flink-conf.yaml).*
 - *The default compilation option remains the same, by Janino. So it does 
not affect existing jobs, unless configured specifically.*
   
   
   ## Verifying this change
   
   
   This change is already covered by existing tests, such as 
*(flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SortITCase.scala)*.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (JavaDocs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot commented on issue #7680: [FLINK-11421][Table API & SQL] Providing more compilation options for code-generated o…

2019-02-11 Thread GitBox
flinkbot commented on issue #7680: [FLINK-11421][Table API & SQL] Providing 
more compilation options for code-generated o…
URL: https://github.com/apache/flink/pull/7680#issuecomment-462593057
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11501) Add a ratelimiting feature to the FlinkKafkaConsumer

2019-02-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11501:
---
Labels: pull-request-available  (was: )

> Add a ratelimiting feature to the FlinkKafkaConsumer
> 
>
> Key: FLINK-11501
> URL: https://issues.apache.org/jira/browse/FLINK-11501
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
> Attachments: RateLimiting-1.png, Ratelimiting-2.png
>
>
> There are instances when a Flink job that reads from Kafka can read at a 
> significantly high throughput (particularly while processing a backlog) and 
> degrade the underlying Kafka cluster.
> While Kafka quotas are perhaps the best way to enforce this ratelimiting, 
> there are cases where such a setup is not available or easily enabled. In 
> such a scenario, ratelimiting on the FlinkKafkaConsumer is useful feature. 
> The approach is essentially involves using Guava's 
> [RateLimiter|https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html]
>  to ratelimit the bytes read from Kafka (in the 
> [KafkaConsumerThread|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java])
> More discussion here: 
> [https://lists.apache.org/thread.html/8140b759ba83f33a22d809887fd2d711f5ffe7069c888eb9b1142272@%3Cdev.flink.apache.org%3E]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version

2019-02-11 Thread GitBox
wujinhu commented on issue #7599: [FLINK-11442] upgrade oss sdk version
URL: https://github.com/apache/flink/pull/7599#issuecomment-462575544
 
 
   Thanks @greghogan  Would you pls help to merge this PR? It seems 
StefanRRichter is not available now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot commented on issue #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-11 Thread GitBox
flinkbot commented on issue #7679: [FLINK-11501][Kafka Connector] Add 
ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679#issuecomment-462574731
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] glaksh100 opened a new pull request #7679: [FLINK-11501][Kafka Connector] Add ratelimiting to Kafka consumer

2019-02-11 Thread GitBox
glaksh100 opened a new pull request #7679: [FLINK-11501][Kafka Connector] Add 
ratelimiting to Kafka consumer
URL: https://github.com/apache/flink/pull/7679
 
 
   ## What is the purpose of the change
   This pull request adds a ratelimiting feature to the Flink Kafka consumer. 
There are instances when a Flink job that reads from Kafka can read at a 
significantly high throughput (particularly while processing a backlog) and 
degrade the underlying Kafka cluster. While Kafka quotas are perhaps the best 
way to enforce this ratelimiting, there are cases where such a setup is not 
available or easily enabled. In such a scenario, ratelimiting on the 
FlinkKafkaConsumer is a useful feature. 
   
   ## Brief change log
 - This feature is set by using a feature flag - 
`kafka.consumer.ratelimiting.enabled`
 -  A`RateLimiterFactory` is used to configure and create a Guava 
[RateLimiter](https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html)
 with a desired rate.
 -  The `consumer.poll()` part of the `run()` loop in the 
`KafkaConsumerThread` is modularized into a `getRecordsFromKafka()` method.
 - The rate is controlled by setting the bytes received from Kafka as the 
parameter to the `acquire()` call. 

   ## Verifying this change
   This change added tests and can be verified as follows:
- Added a `testRateLimiting()` test in the `KafkaConsumerThreadTest` class.
- Manually verified the change using a test application and screenshots of 
results are added 
[here](https://issues.apache.org/jira/browse/FLINK-11501?focusedCommentId=16762965=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16762965).
 
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] HuangZhenQiu commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-11 Thread GitBox
HuangZhenQiu commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r255752753
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -395,12 +405,20 @@ public void onContainersAllocated(List 
containers) {

nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
-
// release the failed container

workerNodeMap.remove(resourceId);

resourceManagerClient.releaseAssignedContainer(container.getId());
-   // and ask for a new one
-   
requestYarnContainerIfRequired();
+   log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
+   recordFailure();
+   if (shouldRejectRequests()) {
+   
rejectAllPendingSlotRequests(new MaximumFailedTaskManagerExceedingException(
+   
String.format("Maximum number of failed container %d in interval %s"
+   
+ "is detected in Resource Manager", maximumFailureTaskExecutorPerInternal,
+   
failureInterval.toString()), t));
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9816) Support Netty configuration to enable an openSSL-based SslEngine

2019-02-11 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9816:
---
Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-11579

> Support Netty configuration to enable an openSSL-based SslEngine
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.
> This ticket is for adding the necessary parts to configure and setup an 
> arbitrary Netty-supported SslEngine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11453) Support SliceStream with forwardable pane info

2019-02-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11453:
---
Labels: pull-request-available  (was: )

> Support SliceStream with forwardable pane  info
> ---
>
> Key: FLINK-11453
> URL: https://issues.apache.org/jira/browse/FLINK-11453
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>
> Support slicing operation that produces slicing:
> {code:java}
> val slicedStream: SlicedStream = inputStream
>   .keyBy("key")
>   .sliceWindow(Time.seconds(5L))   // new “slice window” concept: to 
> combine 
>// tumble results based on discrete
>// non-overlapping windows.
>   .aggregate(aggFunc)
> {code}
> {{SlicedStream}} will produce results that exposes current {{WindowOperator}} 
> internal state {{InternalAppendingState}}, which can be 
> later applied with {{WindowFunction}} separately in another operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot commented on issue #7678: [FLINK-11453][DataStreamAPI] Support SliceStream with forwardable pane info using slice assigner, operator and stream

2019-02-11 Thread GitBox
flinkbot commented on issue #7678: [FLINK-11453][DataStreamAPI] Support 
SliceStream with forwardable pane info using slice assigner, operator and stream
URL: https://github.com/apache/flink/pull/7678#issuecomment-462519667
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] walterddr opened a new pull request #7678: [FLINK-11453][DataStreamAPI] Support SliceStream with forwardable pane info using slice assigner, operator and stream

2019-02-11 Thread GitBox
walterddr opened a new pull request #7678: [FLINK-11453][DataStreamAPI] Support 
SliceStream with forwardable pane info using slice assigner, operator and stream
URL: https://github.com/apache/flink/pull/7678
 
 
   
   
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   This PR introduces a new `SlicedStream` abstract operation, which creates a 
resulting stream of the intermediate results buffered in the internal state of 
`WindowedOperator`. 
   It creates a `Slice` data type as a result to contain all necessary 
information of a pane slice.
   With this API. further processing is possible for operations:
   ```
   val slicedStream: slicedStream = inputStream
 .keyBy("key")
 .sliceWindow(Time.seconds(5L)) 
 .aggregate(aggFunc)
   val resultStream = slicedStream
   .window(Time.seconds(5000L))
   .aggregate(aggFunc)
   ```
   Is possible to create much more efficient sliding window operation, where 
elements won't have to be duplicated into each window.
   
   ## Brief change log
   
 - Added `SliceAssigner` that assigns elements into zero or one window 
(a.k.a. the "slice")
 - Modified `KeyedStream` and `WindowedStream` API to incorporate the 
creation of `SlicedStream`
 - Added `SlicedStream` concept that can emit slicing results.
 - Created special operators `SliceOperator` and `IterableSliceOperator` to 
process the intermediate results.
 - Added in TumblingEvent/ProcessingTimeSliceAssigner as an example.
   
   
   ## Verifying this change
   
   - This change is already covered by multiple tests for backward 
compatibility 
   - This change added tests and can be verified as follows:
 - Added integration tests for end-to-end processing for reduce, 
aggregation, and general apply
 - Added translation tests in scala for verifying `SlicedStream` API 
conversion chained with `KeyedStream`.
 - Added integration tests specifically tested serialization and 
deserialization to/from state snapshot.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): yes
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? not yet, await review
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11580) Provide a dynamically-linked netty-tc

2019-02-11 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-11580:
---

 Summary: Provide a dynamically-linked netty-tc
 Key: FLINK-11580
 URL: https://issues.apache.org/jira/browse/FLINK-11580
 Project: Flink
  Issue Type: Sub-task
  Components: flink-shaded.git, Network
Reporter: Nico Kruber
Assignee: Nico Kruber


In order to have an openSSL-based SSL engine available, we need a shaded 
netty-tc version in the classpath which relies on openSSL libraries from the 
system it is running on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11579) Support Netty SslEngine based on openSSL

2019-02-11 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-11579:
---

 Summary: Support Netty SslEngine based on openSSL
 Key: FLINK-11579
 URL: https://issues.apache.org/jira/browse/FLINK-11579
 Project: Flink
  Issue Type: Improvement
  Components: Network
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.8.0


Since a while now, Netty does not only support the JDK's SSLEngine but also 
implements one based on openSSL which, according to 
https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9816) Support Netty configuration to enable an openSSL-based SslEngine

2019-02-11 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9816:
---
Summary: Support Netty configuration to enable an openSSL-based SslEngine  
(was: Support Netty SslEngine based on openSSL)

> Support Netty configuration to enable an openSSL-based SslEngine
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9816) Support Netty configuration to enable an openSSL-based SslEngine

2019-02-11 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9816?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9816:
---
Description: 
Since a while now, Netty does not only support the JDK's {{SSLEngine}} but also 
implements one based on openSSL which, according to 
https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
faster. We should add support for using that engine instead.

This ticket is for adding the necessary parts to configure and setup an 
arbitrary Netty-supported SslEngine.

  was:Since a while now, Netty does not only support the JDK's {{SSLEngine}} 
but also implements one based on openSSL which, according to 
https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
faster. We should add support for using that engine instead.


> Support Netty configuration to enable an openSSL-based SslEngine
> 
>
> Key: FLINK-9816
> URL: https://issues.apache.org/jira/browse/FLINK-9816
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
>  Labels: pull-request-available
>
> Since a while now, Netty does not only support the JDK's {{SSLEngine}} but 
> also implements one based on openSSL which, according to 
> https://netty.io/wiki/requirements-for-4.x.html#wiki-h4-4 is significantly 
> faster. We should add support for using that engine instead.
> This ticket is for adding the necessary parts to configure and setup an 
> arbitrary Netty-supported SslEngine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot edited a comment on issue #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
flinkbot edited a comment on issue #7631: [FLINK-11391][shuffle] Introduce 
PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#issuecomment-459738324
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r255616682
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 ##
 @@ -52,30 +52,38 @@
/** The ID of the partition the input channel is going to consume. */
private final ResultPartitionID consumedPartitionId;
 
-   /** The location of the partition the input channel is going to 
consume. */
-   private final ResultPartitionLocation consumedPartitionLocation;
+   /** The location type of the partition the input channel is going to 
consume. */
+   private final LocationType locationType;
+
+   /** The connection to use to request the remote partition. */
+   private final Optional connectionId;
 
 Review comment:
   I thought we would just have here `ShuffleDeploymentDescriptor` instead of 
`ConnectionID`. SDD also contains `ConnectionID`. If `LocationType.Unknown` is 
unknown, SDD field could be just special singleton implementation of 
`ShuffleDeploymentDescriptor` -> `UnknownShuffleDeploymentDescriptor`, or is it 
coming later?
   Also, in `ResultPartitionDeploymentDescriptor`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r255629226
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/deployment/ShuffleDeploymentDescriptor.java
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.runtime.io.network.ConnectionID;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deployment descriptor for shuffle specific information.
+ */
+public class ShuffleDeploymentDescriptor implements Serializable {
 
 Review comment:
   I think eventually it needs to be an interface, probably an empty one. This 
one could stay an implementation for the default shuffle master. Also, special 
`UnknownShuffleDeploymentDescriptor` could extend the interface.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor

2019-02-11 Thread GitBox
azagrebin commented on a change in pull request #7631: [FLINK-11391][shuffle] 
Introduce PartitionShuffleDescriptor and ShuffleDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7631#discussion_r255611695
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -813,24 +815,27 @@ TaskDeploymentDescriptor createDeploymentDescriptor(
boolean lazyScheduling = 
getExecutionGraph().getScheduleMode().allowLazyDeployment();
 
for (IntermediateResultPartition partition : 
resultPartitions.values()) {
-
List> consumers = 
partition.getConsumers();
-
+   int maxParallelism;
if (consumers.isEmpty()) {
//TODO this case only exists for test, 
currently there has to be exactly one consumer in real jobs!
-   
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(
-   partition,
-   
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM,
-   lazyScheduling));
+   maxParallelism = 
KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
} else {
Preconditions.checkState(1 == consumers.size(),
-   "Only one consumer supported in 
the current implementation! Found: " + consumers.size());
+   "Only one consumer supported in the 
current implementation! Found: " + consumers.size());
 
List consumer = consumers.get(0);
ExecutionJobVertex vertex = 
consumer.get(0).getTarget().getJobVertex();
-   int maxParallelism = vertex.getMaxParallelism();
-   
producedPartitions.add(ResultPartitionDeploymentDescriptor.from(partition, 
maxParallelism, lazyScheduling));
+   maxParallelism = vertex.getMaxParallelism();
}
+
+   PartitionShuffleDescriptor psd = 
PartitionShuffleDescriptor.from(targetSlot, executionId, partition, 
maxParallelism);
+
+   
producedPartitions.add(ResultPartitionDeploymentDescriptor.fromShuffleDescriptor(psd));
+   
getCurrentExecutionAttempt().cachePartitionShuffleDescriptor(partition.getIntermediateResult().getId(),
 psd);
 
 Review comment:
   Would it work if the complete `TaskDeploymentDescriptor` would be just 
cached as volatile field in `Execution`? Maybe we would not need any of three 
descriptors caches, what do think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-10721.
-
Resolution: Fixed

> kafkaFetcher runFetchLoop throw exception will cause follow-up code not 
> execute in FlinkKafkaConsumerBase run method 
> -
>
> Key: FLINK-10721
> URL: https://issues.apache.org/jira/browse/FLINK-10721
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.6.2
>Reporter: zhaoshijie
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> In FlinkKafkaConsumerBase run method on line 721(master branch), if 
> kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw 
> exception then finally execute cancel method, cancel method will execute 
> kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute 
> handover.close, then result in handover.pollNext throw ClosedException),then 
> next code will not execute,especially discoveryLoopError not be throwed,so, 
> real culprit exception will be Swallowed.
> failed log like this:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Shoud we modify it as follows?
> {code:java}
> try {
>   kafkaFetcher.runFetchLoop();
>   } catch (Exception e) {
>   // if discoveryLoopErrorRef not null ,we should 
> throw real culprit exception
>   if (discoveryLoopErrorRef.get() != null){
>   throw new 
> RuntimeException(discoveryLoopErrorRef.get());
>   } else {
>   throw e;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10721) kafkaFetcher runFetchLoop throw exception will cause follow-up code not execute in FlinkKafkaConsumerBase run method

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765226#comment-16765226
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-10721:
-

I think this is indirectly fixed by the life cycle rework of the 
FlinkKafkaConsumerBase in FLINK-10774.
Will mark this as resolved for now, please re-open if you disagree.

> kafkaFetcher runFetchLoop throw exception will cause follow-up code not 
> execute in FlinkKafkaConsumerBase run method 
> -
>
> Key: FLINK-10721
> URL: https://issues.apache.org/jira/browse/FLINK-10721
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.6.2
>Reporter: zhaoshijie
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.6.4, 1.7.2, 1.8.0
>
>
> In FlinkKafkaConsumerBase run method on line 721(master branch), if 
> kafkaFetcher.runFetchLoop() throw exception(by discoveryLoopThread throw 
> exception then finally execute cancel method, cancel method will execute 
> kafkaFetcher.cancel, this implemented Kafka09Fetcher will execute 
> handover.close, then result in handover.pollNext throw ClosedException),then 
> next code will not execute,especially discoveryLoopError not be throwed,so, 
> real culprit exception will be Swallowed.
> failed log like this:
> {code:java}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply$mcV$sp(JobManager.scala:1229)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$10.apply(JobManager.scala:1172)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.streaming.connectors.kafka.internal.Handover$ClosedException
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Handover.close(Handover.java:180)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.cancel(Kafka09Fetcher.java:174)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:753)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:695)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Shoud we modify it as follows?
> {code:java}
> try {
>   kafkaFetcher.runFetchLoop();
>   } catch (Exception e) {
>   // if discoveryLoopErrorRef not null ,we should 
> throw real culprit exception
>   if (discoveryLoopErrorRef.get() != null){
>   throw new 
> RuntimeException(discoveryLoopErrorRef.get());
>   } else {
>   throw e;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7576: [FLINK-11046][elasticsearch] Fix ElasticSearch6Connector thread blocked when index failed with retry

2019-02-11 Thread GitBox
asfgit closed pull request #7576: [FLINK-11046][elasticsearch] Fix 
ElasticSearch6Connector thread blocked when index failed with retry
URL: https://github.com/apache/flink/pull/7576
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-11046.
---
Resolution: Fixed

Merged.

1.8.0: 3aa92af9a68015f240c6dcc46313202e78ea5883
1.7.2: 2f522271abf03c5584612076b549c98d76a07f0f

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Assignee: xueyu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2, 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still wonder why the retry operation is not in the same thread 
> as the bulk process execution, after i read the code, `bulkAsync` method 
> might be the last puzzle.
> {code:java}
> @Override
> public BulkProcessor.Builder 

[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r255613841
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 ##
 @@ -1430,6 +1430,120 @@ public void 
testReleasingTaskExecutorIfNoMoreSlotsRegistered() throws Exception
}
}
 
+   @Nonnull
+   private TestingResourceManagerGateway 
createAndRegisterTestingResourceManagerGateway() {
+   final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+   
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+   
rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+   return testingResourceManagerGateway;
+   }
+
+   /**
+* Tests that the job execution is failed if the TaskExecutor 
disconnects from the
+* JobMaster.
+*/
+   @Test
+   public void testJobFailureWhenGracefulTaskExecutorTermination() throws 
Exception {
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> heartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
jobMasterGateway.disconnectTaskManager(
+   localTaskManagerLocation.getResourceID(),
+   new FlinkException("Test disconnectTaskManager 
exception.")),
+   (jobMasterGateway, resourceID) -> ignored -> {});
+   }
+
+   @Test
+   public void testJobFailureWhenTaskExecutorHeartbeatTimeout() throws 
Exception {
+   final AtomicBoolean respondToHeartbeats = new 
AtomicBoolean(true);
+   runJobFailureWhenTaskExecutorTerminatesTest(
+   () -> fastHeartbeatServices,
+   (localTaskManagerLocation, jobMasterGateway) -> 
respondToHeartbeats.set(false),
+   (jobMasterGateway, taskManagerResourceId) -> resourceId 
-> {
+   if (respondToHeartbeats.get()) {
+   
jobMasterGateway.heartbeatFromTaskManager(taskManagerResourceId, new 
AccumulatorReport(Collections.emptyList()));
+   }
+   }
+   );
+   }
+
+   private void runJobFailureWhenTaskExecutorTerminatesTest(
+   Supplier heartbeatSupplier,
+   BiConsumer 
jobReachedRunningState,
+   BiFunction> 
heartbeatConsumerFunction) throws Exception {
+   final JobGraph jobGraph = createSingleVertexJobGraph();
+   final TestingOnCompletionActions onCompletionActions = new 
TestingOnCompletionActions();
+   final JobMaster jobMaster = createJobMaster(
+   new Configuration(),
+   jobGraph,
+   haServices,
+   new TestingJobManagerSharedServicesBuilder().build(),
+   heartbeatSupplier.get(),
+   onCompletionActions);
+
+   createAndRegisterTestingResourceManagerGateway();
+
+   try {
+   jobMaster.start(jobMasterId).get();
+
+   final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+   final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+   final CompletableFuture 
taskDeploymentFuture = new CompletableFuture<>();
+   final TestingTaskExecutorGateway taskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+   
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+   
taskDeploymentFuture.complete(taskDeploymentDescriptor.getExecutionAttemptId());
+   return 
CompletableFuture.completedFuture(Acknowledge.get());
+   })
+   
.setHeartbeatJobManagerConsumer(heartbeatConsumerFunction.apply(jobMasterGateway,
 taskManagerLocation.getResourceID()))
+   .createTestingTaskExecutorGateway();
+   
rpcService.registerGateway(taskExecutorGateway.getAddress(), 
taskExecutorGateway);
+
+   
jobMasterGateway.registerTaskManager(taskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+   final SlotOffer slotOffer = new SlotOffer(new 
AllocationID(), 0, ResourceProfile.UNKNOWN);
+   final Collection slotOffers = 
jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), 

[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r255593757
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -945,4 +954,12 @@ public void onFatalError(Throwable exception) {
closeAsync();
}
}
+
+   private class TerminatingFatalErrorHandlerFactory {
+
+   @GuardedBy("lock")
+   private TerminatingFatalErrorHandler create() {
+   return new 
TerminatingFatalErrorHandler(taskManagers.size());
 
 Review comment:
   This deserves a comment. We pass `taskmanagers.size()` but the constructor 
expects an index. One might think that this throws 
`ArrayIndexOutOfBoundsException`.
   ```
synchronized (lock) {
taskManagers.get(index).shutDown();
}
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r255589778
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -696,34 +686,38 @@ protected ResourceManagerRunner startResourceManager(
return resourceManagerRunner;
}
 
-   protected TaskExecutor[] startTaskManagers(
-   Configuration configuration,
-   HighAvailabilityServices haServices,
-   HeartbeatServices heartbeatServices,
-   MetricRegistry metricRegistry,
-   BlobCacheService blobCacheService,
-   int numTaskManagers,
-   RpcServiceFactory rpcServiceFactory) throws Exception {
+   @GuardedBy("lock")
+   private void startTaskManagers() throws Exception {
 
 Review comment:
   Should be moved to 
   ```
// 

//  Internal methods
// 

   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r255600139
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniCluster.java
 ##
 @@ -66,6 +67,17 @@ public TestingMiniCluster(TestingMiniClusterConfiguration 
miniClusterConfigurati
return super.getDispatcherResourceManagerComponents();
}
 
+   @Nonnull
+   @Override
+   public CompletableFuture terminateTaskExecutor(int index) {
+   return super.terminateTaskExecutor(index);
+   }
+
+   @Override
+   public void startTaskExecutor(boolean localCommunication) throws 
Exception {
 
 Review comment:
   Doesn't look like user friendly API. Shouldn't `MiniCluster` set this flag 
depending on the configuration? Wouldn't it be enough to expose a signature 
such as `public void startTaskExecutor()`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r255600846
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 ##
 @@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for the {@link TaskExecutor}.
+ */
+public class TaskExecutorITCase extends TestLogger {
+
+   private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L);
+   private static final int NUM_TMS = 2;
+   private static final int SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+
+   private TestingMiniCluster miniCluster;
+
+   @Before
+   public void setup() throws Exception  {
+   miniCluster = new TestingMiniCluster(
+   new TestingMiniClusterConfiguration.Builder()
+   .setNumTaskManagers(NUM_TMS)
+   .setNumSlotsPerTaskManager(SLOTS_PER_TM)
+   .build(),
+   null);
+
+   miniCluster.start();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (miniCluster != null) {
+   miniCluster.close();
+   }
+   }
+
+   /**
+* Tests that a job will be re-executed if a new TaskExecutor joins the 
cluster.
+*/
+   @Test
+   public void testNewTaskExecutorJoinsCluster() throws Exception {
+
+   final Deadline deadline = Deadline.fromNow(TESTING_TIMEOUT);
 
 Review comment:
   Declaration of `deadline` is too early. Should be closer to 
`waitUntilCondition`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r255591538
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 ##
 @@ -738,6 +732,26 @@ protected ResourceManagerRunner startResourceManager(
//  Internal methods
// 

 
+   @GuardedBy("lock")
+   private Collection> 
terminateTaskExecutor() {
 
 Review comment:
   `terminateTaskExecutors`? (plural)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on a change in pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#discussion_r255594841
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 ##
 @@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.TestingMiniCluster;
+import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Integration tests for the {@link TaskExecutor}.
+ */
+public class TaskExecutorITCase extends TestLogger {
+
+   private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2L);
+   private static final int NUM_TMS = 2;
+   private static final int SLOTS_PER_TM = 2;
+   private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
+
+   private TestingMiniCluster miniCluster;
+
+   @Before
+   public void setup() throws Exception  {
+   miniCluster = new TestingMiniCluster(
+   new TestingMiniClusterConfiguration.Builder()
+   .setNumTaskManagers(NUM_TMS)
+   .setNumSlotsPerTaskManager(SLOTS_PER_TM)
+   .build(),
+   null);
+
+   miniCluster.start();
+   }
+
+   @After
+   public void teardown() throws Exception {
+   if (miniCluster != null) {
+   miniCluster.close();
+   }
+   }
+
+   /**
+* Tests that a job will be re-executed if a new TaskExecutor joins the 
cluster.
+*/
+   @Test
+   public void testNewTaskExecutorJoinsCluster() throws Exception {
+
+   final Deadline deadline = Deadline.fromNow(TESTING_TIMEOUT);
+
+   final JobGraph jobGraph = createJobGraph(PARALLELISM);
+
+   miniCluster.submitJob(jobGraph).get();
+
+   final CompletableFuture jobResultFuture = 
miniCluster.requestJobResult(jobGraph.getJobID());
+
+   assertThat(jobResultFuture.isDone(), is(false));
+
+   CommonTestUtils.waitUntilCondition(
+   jobIsRunning(() -> 
miniCluster.getExecutionGraph(jobGraph.getJobID())),
+   deadline,
+   20L);
+
+   // kill one TaskExecutor which should fail the job execution
+   miniCluster.terminateTaskExecutor(0);
+
+   final JobResult jobResult = jobResultFuture.get();
+
+   assertThat(jobResult.isSuccess(), is(false));
+
+   miniCluster.startTaskExecutor(false);
+
+   BlockingOperator.unblock();
+
+   miniCluster.submitJob(jobGraph).get();
+
+   miniCluster.requestJobResult(jobGraph.getJobID()).get();
+   }
+
+   private SupplierWithException 
jobIsRunning(Supplier> 
executionGraphFutureSupplier) {
+   final Predicate allExecutionsRunning = 

[jira] [Assigned] (FLINK-11577) Check and port StackTraceSampleCoordinatorITCase to new code base if necessary

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-11577:


Assignee: (was: Gary Yao)

> Check and port StackTraceSampleCoordinatorITCase to new code base if necessary
> --
>
> Key: FLINK-11577
> URL: https://issues.apache.org/jira/browse/FLINK-11577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor

2019-02-11 Thread GitBox
asfgit closed pull request #7637: [FLINK-11511] Remove legacy class 
JobAttachmentClientActor
URL: https://github.com/apache/flink/pull/7637
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11511) Remove legacy class JobAttachmentClientActor

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-11511.

Resolution: Fixed

Fixed via ef2edd09d32b988a5ac0209787d28e7a48b79ddb

> Remove legacy class JobAttachmentClientActor
> 
>
> Key: FLINK-11511
> URL: https://issues.apache.org/jira/browse/FLINK-11511
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Remove legacy class JobAttachmentClientActor



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] rehevkor5 commented on a change in pull request #7672: [FLINK-11568][kinesis] Don't obscure important Kinesis exceptions

2019-02-11 Thread GitBox
rehevkor5 commented on a change in pull request #7672: [FLINK-11568][kinesis] 
Don't obscure important Kinesis exceptions
URL: https://github.com/apache/flink/pull/7672#discussion_r255585813
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcherTest.java
 ##
 @@ -811,4 +818,63 @@ protected long getCurrentTimeMillis() {
Assert.assertTrue("idle, no watermark", watermarks.isEmpty());
}
 
+   @Test
+   public void 
testOriginalExceptionIsPreservedWhenInterruptedDuringShutdown() throws 
Exception {
+   final String stream = "fakeStream";
+
+   Map>> streamsToShardQueues = 
new HashMap<>();
+   final LinkedBlockingQueue queue = new 
LinkedBlockingQueue<>(10);
+   queue.put("item1");
+   streamsToShardQueues.put(stream, 
Collections.singletonList(queue));
+
+   final AlwaysThrowsDeserializationSchema deserializationSchema = 
new AlwaysThrowsDeserializationSchema();
+   final KinesisProxyInterface fakeKinesis =
+   
FakeKinesisBehavioursFactory.blockingQueueGetRecords(streamsToShardQueues);
+
+   final 
TestableKinesisDataFetcherForShardConsumerException fetcher = new 
TestableKinesisDataFetcherForShardConsumerException<>(
+   Collections.singletonList(stream),
+   new TestSourceContext<>(),
+   TestUtils.getStandardProperties(),
+   new 
KinesisDeserializationSchemaWrapper<>(deserializationSchema),
+   10,
+   2,
+   new AtomicReference<>(),
+   new LinkedList<>(),
+   new HashMap<>(),
+   fakeKinesis);
+
+   final DummyFlinkKinesisConsumer consumer = new 
DummyFlinkKinesisConsumer<>(
 
 Review comment:
   My IDE automatically marks things `final` wherever possible, to prevent 
writing accidental mutable code, but I will happily remove them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not be migrated to FlinkKafkaProducer

2019-02-11 Thread GitBox
pnowojski commented on issue #7405: [FLINK-11249] FlinkKafkaProducer011 can not 
be migrated to FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/7405#issuecomment-462391160
 
 
   Thanks @EAlexRojas for the investigation. I am/was also in the process of 
discovering/fixing the same problems that you have mentioned and writing the 
tests for that. So far I have rewritten & fixed the tests provided by 
@tvielgouarin and encountered the same errors.
   
   TLDR;
   I'm thinking how to solve this situation, but I'm afraid it won't make it to 
1.8...
   
   Full story:
   Regarding the 0.11 vs 2.0 dependency, having both of them in the same class 
path won't work because of the dependency convergence, but it doesn't have to. 
Committing/aborting 0.11 transactions with 2.0 connector should work fine, so 
that shouldn't be an issue.
   
   Real problem is that `FlinkKafkaProducer` and `FlinkKafkaProducer011` have 
different names and they defined static classes ` NextTransactionalIdHint`, 
`KafkaTransactionState` and `KafkaTransactionContext` inside the parent 
classes. This is causing incompatibility problems since for example 
`FlinkKafkaProducer011.KafkaTransactionState` and 
`FlinkKafkaProducer.KafkaTransactionState` are treated as completely 
incompatible classes, despite being identical.
   
   It can probably be solved by:
   1. custom serialization logic, like keeping a fake/dummy 
`FlinkKafkaProducer011.XXXSerializer.XXXSerializerSnapshot` classes in the 
universal connector, as entry points for the deserialization
   2. Add a "force skip class compatibility check" flag to the current 
serialization stack. After all serialized binary data are exactly the same in 
all of those cases. This is work in progress by @tzulitai, this might happen in 
time for 1.8 release.
   3. Add a more powerful state migration function, that would be able to 
change type of a field/class. This is also on our road map, but won't happen in 
1.8.
   
   Either way, unfortunately I'm away for next two weeks and I can not solve 
this issue before 1.8 feature freeze. This fix will have to wait for 1.9 
release.
   
   I have implemented working regression tests for state compatibility between 
Flink versions: https://github.com/apache/flink/pull/7677
   Test for migration from 0.11 to universal connector is also easy to 
implement:
   
https://github.com/pnowojski/flink/tree/kafka-migration-0.11-to-universal-not-working
   But I didn't have time to make it work (as described above).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot commented on issue #7677: [FLINK-11249][kafka] Add migration tests for FlinkKafkaProdcuer and FlinkKafkaProducer011

2019-02-11 Thread GitBox
flinkbot commented on issue #7677: [FLINK-11249][kafka] Add migration tests for 
FlinkKafkaProdcuer and FlinkKafkaProducer011
URL: https://github.com/apache/flink/pull/7677#issuecomment-462389051
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski opened a new pull request #7677: [FLINK-11249][kafka] Add migration tests for FlinkKafkaProdcuer and FlinkKafkaProducer011

2019-02-11 Thread GitBox
pnowojski opened a new pull request #7677: [FLINK-11249][kafka] Add migration 
tests for FlinkKafkaProdcuer and FlinkKafkaProducer011
URL: https://github.com/apache/flink/pull/7677
 
 
   This PR doesn't solve the problem of migrating from 0.11 to universal 
connector, it just adds regression tests to make sure that those two 
connectors' producers have state compatibility with previous Flink releases.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (**yes** / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot edited a comment on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
flinkbot edited a comment on issue #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#issuecomment-462382064
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @GJL [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @GJL [committer]
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
GJL commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase 
to new code base
URL: https://github.com/apache/flink/pull/7676#issuecomment-462387760
 
 
   @flinkbot approve description
   @flinkbot approve consensus 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] flinkbot commented on issue #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
flinkbot commented on issue #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676#issuecomment-462382064
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❌ 1. The [description] looks good.
   * ❌ 2. There is [consensus] that the contribution should go into to Flink.
   * ❔ 3. Needs [attention] from.
   * ❌ 4. The change fits into the overall [architecture].
   * ❌ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11364) Check and port TaskManagerFailsITCase to new code base if necessary

2019-02-11 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11364?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11364:
---
Labels: pull-request-available  (was: )

> Check and port TaskManagerFailsITCase to new code base if necessary
> ---
>
> Key: FLINK-11364
> URL: https://issues.apache.org/jira/browse/FLINK-11364
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
>
> Check and port {{TaskManagerFailsITCase}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann opened a new pull request #7676: [FLINK-11364][tests] Port TaskManagerFailsITCase to new code base

2019-02-11 Thread GitBox
tillrohrmann opened a new pull request #7676: [FLINK-11364][tests] Port 
TaskManagerFailsITCase to new code base
URL: https://github.com/apache/flink/pull/7676
 
 
   ## What is the purpose of the change
   
   Port `TaskManagerFailsITCase` to new code base.
   
   ## Brief change log
   
   - "detect a failing task manager" --> 
JobMaster#testHeartbeatTimeoutWithTaskManager
   
   - "handle gracefully failing task manager" --> 
JobMasterTest#testJobFailureWhenGracefulTaskExecutorTermination
   
   - "handle hard failing task manager" --> 
JobMasterTest#testJobFailureWhenTaskExecutorHeartbeatTimeout
   
   - "go into a clean state in case of a TaskManager failure" --> 
TaskExecutorITCase#testNewTaskExecutorJoinsCluster
   
   ## Verifying this change
   
   - Run ported tests
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11578) Check and port BackPressureStatsTrackerImplITCase to new code base if necessary

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-11578:
-
Description: Check and port {{BackPressureStatsTrackerImplITCase}} to new 
code base if necessary.  (was: Check and port 
{{StackTraceSampleCoordinatorITCase}} to new code base if necessary.)

> Check and port BackPressureStatsTrackerImplITCase to new code base if 
> necessary
> ---
>
> Key: FLINK-11578
> URL: https://issues.apache.org/jira/browse/FLINK-11578
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Check and port {{BackPressureStatsTrackerImplITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11577) Check and port StackTraceSampleCoordinatorITCase to new code base if necessary

2019-02-11 Thread Gary Yao (JIRA)
Gary Yao created FLINK-11577:


 Summary: Check and port StackTraceSampleCoordinatorITCase to new 
code base if necessary
 Key: FLINK-11577
 URL: https://issues.apache.org/jira/browse/FLINK-11577
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Gary Yao
Assignee: Gary Yao
 Fix For: 1.8.0


Check and port {{ClusterShutdownITCase}} to new code base if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11578) Check and port BackPressureStatsTrackerImplITCase to new code base if necessary

2019-02-11 Thread Gary Yao (JIRA)
Gary Yao created FLINK-11578:


 Summary: Check and port BackPressureStatsTrackerImplITCase to new 
code base if necessary
 Key: FLINK-11578
 URL: https://issues.apache.org/jira/browse/FLINK-11578
 Project: Flink
  Issue Type: Sub-task
  Components: Tests
Reporter: Gary Yao
Assignee: Gary Yao
 Fix For: 1.8.0


Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if 
necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11577) Check and port StackTraceSampleCoordinatorITCase to new code base if necessary

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-11577:
-
Description: Check and port {{StackTraceSampleCoordinatorITCase}} to new 
code base if necessary.  (was: Check and port {{ClusterShutdownITCase}} to new 
code base if necessary.)

> Check and port StackTraceSampleCoordinatorITCase to new code base if necessary
> --
>
> Key: FLINK-11577
> URL: https://issues.apache.org/jira/browse/FLINK-11577
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Check and port {{StackTraceSampleCoordinatorITCase}} to new code base if 
> necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255547663
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/AllroundMiniClusterTest.java
 ##
 @@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+import static 
org.apache.flink.configuration.JobManagerOptions.EXECUTION_FAILOVER_STRATEGY;
+
+/**
+ * DataStreamAllroundTestProgram on Minicluster.
+ */
+public class AllroundMiniClusterTest {
 
 Review comment:
   No, the intention is for manual debugging, it can be ignored and go to test 
nevertheless.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11511) Remove legacy class JobAttachmentClientActor

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-11511:
-
Description: Remove legacy class JobAttachmentClientActor

> Remove legacy class JobAttachmentClientActor
> 
>
> Key: FLINK-11511
> URL: https://issues.apache.org/jira/browse/FLINK-11511
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Remove legacy class JobAttachmentClientActor



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flinkbot edited a comment on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor

2019-02-11 Thread GitBox
flinkbot edited a comment on issue #7637: [FLINK-11511] Remove legacy class 
JobAttachmentClientActor
URL: https://github.com/apache/flink/pull/7637#issuecomment-459738301
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @GJL [committer]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @GJL [committer]
   * ❔ 3. Needs [attention] from.
   * ✅ 4. The change fits into the overall [architecture].
   - Approved by @GJL [committer]
   * ✅ 5. Overall code [quality] is good.
   - Approved by @GJL [committer]
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
 Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve the 1st aspect (similarly, it 
also supports the `consensus`, `architecture` and `quality` keywords)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor

2019-02-11 Thread GitBox
GJL commented on issue #7637: [FLINK-11511] Remove legacy class 
JobAttachmentClientActor
URL: https://github.com/apache/flink/pull/7637#issuecomment-462359647
 
 
   @flinkbot approve all


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7637: [FLINK-11511] Remove legacy class JobAttachmentClientActor

2019-02-11 Thread GitBox
GJL commented on issue #7637: [FLINK-11511] Remove legacy class 
JobAttachmentClientActor
URL: https://github.com/apache/flink/pull/7637#issuecomment-462359456
 
 
   LGTM, merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255543476
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/PreviousAllocationSlotSelectionStrategy.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmaster.SlotInfo;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * This class implements a {@link SlotSelectionStrategy} that is based on 
previous allocations and
+ * falls back to using location preference hints if there is no previous 
allocation.
+ */
+public enum PreviousAllocationSlotSelectionStrategy implements 
SlotSelectionStrategy {
 
 Review comment:
   There was a previous unit test since 1.7 for the local recovery fix, it was 
not yet wired to the new code and did in fact use an outdated method. I fixed 
the wiring, slightly extended it and also use a subset of the test for 
`LocationPreferenceSlotSelection`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-11508.

Resolution: Fixed

> Remove invalid test AkkaJobManagerRetrieverTest
> ---
>
> Key: FLINK-11508
> URL: https://issues.apache.org/jira/browse/FLINK-11508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Remove invalid test AkkaJobManagerRetrieverTest



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-11508:
-
Description: Remove invalid test AkkaJobManagerRetrieverTest

> Remove invalid test AkkaJobManagerRetrieverTest
> ---
>
> Key: FLINK-11508
> URL: https://issues.apache.org/jira/browse/FLINK-11508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Remove invalid test AkkaJobManagerRetrieverTest



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reopened FLINK-11508:
--

> Remove invalid test AkkaJobManagerRetrieverTest
> ---
>
> Key: FLINK-11508
> URL: https://issues.apache.org/jira/browse/FLINK-11508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-11507) Remove invalid test JobClientActorTest

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-11507.

Resolution: Fixed

Fixed via e6b5eda7359cd1fd2c58f5e33b844c4ea061294a

> Remove invalid test JobClientActorTest
> --
>
> Key: FLINK-11507
> URL: https://issues.apache.org/jira/browse/FLINK-11507
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> As discussed in FLINK-11146, {{JobClient}} and {{JobClientActor}} is a static 
> helper class as the bridge between the non-actor code and the {{JobManager}}. 
> In FLIP-6 codebase we will finally converge to a {{NewClusterClient}}. Thus 
> this test is invalid.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #7633: [FLINK-11508] Remove invalid test AkkaJobManagerRetrieverTest

2019-02-11 Thread GitBox
asfgit closed pull request #7633: [FLINK-11508] Remove invalid test 
AkkaJobManagerRetrieverTest
URL: https://github.com/apache/flink/pull/7633
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7635: [FLINK-11507] Remove invalid test JobClientActorTest

2019-02-11 Thread GitBox
asfgit closed pull request #7635:  [FLINK-11507] Remove invalid test 
JobClientActorTest
URL: https://github.com/apache/flink/pull/7635
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-11508) Remove invalid test AkkaJobManagerRetrieverTest

2019-02-11 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-11508.

Resolution: Fixed

Fixed via 6ae2ed40830bd3bf4c94b0038c373f25c5cdba7b

> Remove invalid test AkkaJobManagerRetrieverTest
> ---
>
> Key: FLINK-11508
> URL: https://issues.apache.org/jira/browse/FLINK-11508
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.8.0
>Reporter: TisonKun
>Assignee: TisonKun
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10834) TableAPI flatten() calculated value error

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10834:

Fix Version/s: (was: 1.7.2)
   1.7.3

> TableAPI flatten() calculated value error
> -
>
> Key: FLINK-10834
> URL: https://issues.apache.org/jira/browse/FLINK-10834
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: Hequn Cheng
>Priority: Major
> Fix For: 1.7.3
>
>
> We have a UDF as follows:
> {code:java}
> object FuncRow extends ScalarFunction {
> def eval(v: Int): Row = { 
>   val version = "" + new Random().nextInt()          
>   val row = new Row(3)          
>   row.setField(0, version)          
>   row.setField(1, version)          
>   row.setField(2, version)          
>   row 
> }
> override def isDeterministic: Boolean = false
> override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
>  Types.ROW(Types.STRING, Types.STRING, Types.STRING)
> }
> {code}
> Do the following Query:
> {code:sql}
> val data = new mutable.MutableList[(Int, Long, String)]
>  data.+=((1, 1L, "Hi"))
>  val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b,'c)
>  .select(FuncRow('a).flatten()).as('v1, 'v2, 'v3)
> {code}
> The result is : -1189206469,-151367792,1988676906
> The result expected by the user should be:  v1==v2==v3 .
> It looks the real reason is that there is no result of the reuse in codegen.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11260) Bump Janino compiler dependency

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11260:

Fix Version/s: (was: 1.7.2)
   1.7.3

> Bump Janino compiler dependency
> ---
>
> Key: FLINK-11260
> URL: https://issues.apache.org/jira/browse/FLINK-11260
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.7.1
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Bump the Janino dependency: 
> http://janino-compiler.github.io/janino/changelog.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9228) log details about task fail/task manager is shutting down

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9228:
---
Fix Version/s: (was: 1.7.2)
   1.7.3

> log details about task fail/task manager is shutting down
> -
>
> Key: FLINK-9228
> URL: https://issues.apache.org/jira/browse/FLINK-9228
> Project: Flink
>  Issue Type: Improvement
>  Components: Logging
>Affects Versions: 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.6.4, 1.7.3, 1.8.0
>
>
> condition:
> flink version:1.4.2
> jdk version:1.8.0.20
> linux version:3.10.0
> problem description:
> one of my task manager is out of the cluster and I checked its log found 
> something below: 
> 2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task       
>               
> - Attempting to fail task externally Process (115/120) 
> (19d0b0ce1ef3b8023b37bdfda643ef44). 
> 2018-04-19 22:34:47,441 INFO  org.apache.flink.runtime.taskmanager.Task       
>               
> - Process (115/120) (19d0b0ce1ef3b8023b37bdfda643ef44) switched from RUNNING 
> to FAILED. 
> java.lang.Exception: TaskManager is shutting down. 
>         at 
> org.apache.flink.runtime.taskmanager.TaskManager.postStop(TaskManager.scala:220)
>  
>         at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) 
>         at 
> org.apache.flink.runtime.taskmanager.TaskManager.aroundPostStop(TaskManager.scala:121)
>  
>         at 
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>  
>         at 
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) 
>         at akka.actor.ActorCell.terminate(ActorCell.scala:374) 
>         at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) 
>         at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) 
>         at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) 
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) 
>         at akka.dispatch.Mailbox.run(Mailbox.scala:224) 
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:234) 
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  
> suggestion:
>  # short term suggestion:
>  ## log reasons why task tail?maybe received some event from job 
> manager/can't connect to job manager? operator exception? the more claritify 
> the better
>  ## log reasons why task manager is shutting down? received some event from 
> job manager/can't connect to job manager? operator exception can't be 
> recovery?
>  # long term suggestion:
>  ## define the state machine of flink node clearly. if nothing happens, the 
> node should stay what it used to be, which means if it is processing events, 
> if nothing happens, it should still processing events.or in other words, if 
> its state changes from processing event to cancel, then event happens.
>  ## define the events which can cause node state changed clearly. like use 
> cancel, operator exception, heart beat timeout etc
>  ## log the state change and event which cause state chaged clearly in logs
>  ## show event details(time, node, event, state changed etc) in webui



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10875) Add `toTableWithTimestamp` method in `DataStreamConversions`

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10875:

Fix Version/s: (was: 1.7.2)
   1.7.3

> Add `toTableWithTimestamp` method in `DataStreamConversions`
> 
>
> Key: FLINK-10875
> URL: https://issues.apache.org/jira/browse/FLINK-10875
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: sunjincheng
>Assignee: sunjincheng
>Priority: Minor
> Fix For: 1.7.3
>
>
> Currently we convert a `DataStream` to a `Table` by  
> `DataStreamConversions#toTable`, e.g.:
> {code:java}
> // Without TimeAttribute
> ...
> val stream = env.fromCollection(...)
> val tab = stream.toTable(tEnv, 'a, 'b, 'c)
> val result = tab.select('a, 'b)
> 
> // With TimeAttribute
> ...
> val stream = env.fromCollection(...).assignTimestampsAndWatermarks(...)
> val tab = stream.toTable(tEnv, 'a, 'b, 'c, 'ts.rowtime)
> val result = tab.window(Session withGap 5.milli on 'ts as 'w)
> ...{code}
> I think the fieldNames parameter in the `toTable` method is reasonable in the 
> conversion without the time attribute, because the fieldNames will actually 
> correspond to the fields of the physical table, but when applied to the 
> conversion with the time attribute, the time attribute column is silently 
> added to the table. This feeling is very Magical, so I recommend adding a 
> method that allows the user to display the time attribute added to the 
> physical table: `toTableWithTimestamp`, which is automatically named to the 
> time attribute column named by user input and TimeCharacteristic, eg:
> {code:java}
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> ...
> val table = stream.toTableWithTimestamp(tEnv, 'count, 'size, 'name, 'ts)
>   .window(Tumble over 2.rows on 'ts as 'w)
> ...
> {code}
> In the example above the flink will mark `ts` ad a `RowtimeAttribute`.
> What do you think ? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11259) Bump Zookeeper dependency to 3.4.13

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11259:

Fix Version/s: (was: 1.7.2)
   1.7.3

> Bump Zookeeper dependency to 3.4.13
> ---
>
> Key: FLINK-11259
> URL: https://issues.apache.org/jira/browse/FLINK-11259
> Project: Flink
>  Issue Type: Improvement
>  Components: Cluster Management
>Affects Versions: 1.7.1
>Reporter: Fokko Driesprong
>Assignee: Fokko Driesprong
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Bump Zookeeper to 3.4.13
> https://zookeeper.apache.org/doc/r3.4.13/releasenotes.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11250) fix thread leaked when StreamTask switched from DEPLOYING to CANCELING

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11250:

Fix Version/s: (was: 1.7.2)
   1.7.3

> fix thread leaked when StreamTask switched from DEPLOYING to CANCELING
> --
>
> Key: FLINK-11250
> URL: https://issues.apache.org/jira/browse/FLINK-11250
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime, Streaming
>Affects Versions: 1.5.6, 1.6.3, 1.7.1
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> begin flink-1.5.x version, streamRecordWriters was created in StreamTask's 
> constructor, which start OutputFlusher daemon thread. so when task switched 
> from DEPLOYING to CANCELING state, the daemon thread will be leaked.
>  
> *reproducible example*
> {code:java}
> public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(5000);
> env
> .addSource(new SourceFunction() {
> @Override
> public void run(SourceContext ctx) throws Exception {
> for (int i = 0; i < 1; i++) {
> Thread.sleep(100);
> ctx.collect("data " + i);
> }
> }
> @Override
> public void cancel() {
> }
> })
> .addSink(new RichSinkFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
> System.out.println(1 / 0);
> }
> @Override
> public void invoke(String value, Context context) throws 
> Exception {
> }
> }).setParallelism(2);
> env.execute();
> }{code}
> *some useful log*
> {code:java}
> 2019-01-02 03:03:47.525 [thread==> jobmanager-future-thread-2] 
> executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from CREATED to SCHEDULED.
> 2019-01-02 03:03:47.526 [thread==> flink-akka.actor.default-dispatcher-5] 
> slotpool.SlotPool#allocateSlot:326 Received slot request 
> [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] for task: Attempt #1 
> (Source: Custom Source (1/1)) @ (unassigned) - [SCHEDULED]
> 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] 
> slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot 
> [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] in slot 
> [SlotRequestId{6d7f0173c1d48e5559f6a14b080ee817}].
> 2019-01-02 03:03:47.527 [thread==> flink-akka.actor.default-dispatcher-5] 
> slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
> single task slot [SlotRequestId{12bfcf1674f5b96567a076086dbbfd1b}] in multi 
> task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group 
> bc764cd8ddf7a0cff126f51c16239658.
> 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
> slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
> single task slot [SlotRequestId{8a877431375df8aeadb2fd845cae15fc}] in multi 
> task slot [SlotRequestId{494e47eb8318e2c0a1db91dda6b8}] for group 
> 0a448493b4782967b150582570326227.
> 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
> slotpool.SlotSharingManager#createRootSlot:151 Create multi task slot 
> [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] in slot 
> [SlotRequestId{dbf5c9fa39f1e5a0b34a4a8c10699ee5}].
> 2019-01-02 03:03:47.528 [thread==> flink-akka.actor.default-dispatcher-2] 
> slotpool.SlotSharingManager$MultiTaskSlot#allocateSingleTaskSlot:426 Create 
> single task slot [SlotRequestId{5929c12b52dccee682f86afbe1cff5cf}] in multi 
> task slot [SlotRequestId{56a36d3902ee1a7d0e2e84f50039c1ca}] for group 
> 0a448493b4782967b150582570326227.
> 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] 
> executiongraph.Execution#transitionState:1316 Source: Custom Source (1/1) 
> (74a4ed4bb2f80aa2b98e11bd09ea64ef) switched from SCHEDULED to DEPLOYING.
> 2019-01-02 03:03:47.529 [thread==> flink-akka.actor.default-dispatcher-5] 
> executiongraph.Execution#deploy:576 Deploying Source: Custom Source (1/1) 
> (attempt #1) to localhost
> 2019-01-02 03:03:47.530 [thread==> flink-akka.actor.default-dispatcher-2] 
> state.TaskExecutorLocalStateStoresManager#localStateStoreForSubtask:162 
> Registered new local state store with configuration 
> 

[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6:

Fix Version/s: (was: 1.7.2)
   1.7.3

> Overwrite outdated in-progress files in StreamingFileSink.
> --
>
> Key: FLINK-6
> URL: https://issues.apache.org/jira/browse/FLINK-6
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In order to guarantee exactly-once semantics, the streaming file sink is 
> implementing a two-phase commit protocol when writing files to the filesystem.
> Initially data is written to in-progress files. These files are then put into 
> "pending" state when they are completed (based on the rolling policy), and 
> they are finally committed when the checkpoint that put them in the "pending" 
> state is acknowledged as complete.
> The above shows that in the case that we have:
> 1) checkpoints A, B, C coming 
> 2) checkpoint A being acknowledged and 
> 3) failure
> Then we may have files that do not belong to any checkpoint (because B and C 
> were not considered successful). These files are currently not cleaned up.
> In order to reduce the amount of such files created, we removed the random 
> suffix from in-progress temporary files, so that the next in-progress file 
> that is opened for this part, overwrites them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11107) [state] Avoid memory stateBackend to create arbitrary folders under HA path when no checkpoint path configured

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11107:

Fix Version/s: (was: 1.7.2)
   1.7.3

> [state] Avoid memory stateBackend to create arbitrary folders under HA path 
> when no checkpoint path configured
> --
>
> Key: FLINK-11107
> URL: https://issues.apache.org/jira/browse/FLINK-11107
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.4, 1.7.3
>
>
> Currently, memory state-backend would create a folder named with random UUID 
> under HA directory if no checkpoint path ever configured. (the code logic 
> locates within {{StateBackendLoader#fromApplicationOrConfigOrDefault}}) 
> However, the default memory state-backend would not only be created on JM 
> side, but also on each task manager's side, which means many folders with 
> random UUID would be created under HA directory. It would result in exception 
> like:
> {noformat}
> The directory item limit of /tmp/flink/ha is exceeded: limit=1048576 
> items=1048576{noformat}
>  If this happens, no new jobs could be submitted only if we clean up those 
> directories manually.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11061) Add travis profile that would run on each commit with scala 2.12

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11061:

Fix Version/s: (was: 1.7.2)
   1.7.3

> Add travis profile that would run on each commit with scala 2.12
> 
>
> Key: FLINK-11061
> URL: https://issues.apache.org/jira/browse/FLINK-11061
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Affects Versions: 1.7.0, 1.8.0
>Reporter: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.7.3, 1.8.0
>
>
> In flink 1.7.0 we introduced support for scala 2.12 therefore we should add a 
> profile in travis that we check we do not break scala 2.12 compatibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11055) Allow Queryable State to be transformed on the TaskManager before being returned to the client

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-11055:

Fix Version/s: (was: 1.7.2)
   1.7.3

> Allow Queryable State to be transformed on the TaskManager before being 
> returned to the client
> --
>
> Key: FLINK-11055
> URL: https://issues.apache.org/jira/browse/FLINK-11055
> Project: Flink
>  Issue Type: New Feature
>  Components: Queryable State
>Reporter: Galen Warren
>Priority: Major
> Fix For: 1.7.3
>
>
> The proposal here is to enhance the way Queryable State works to allow for 
> the state object to be transformed on the TaskManager before being returned 
> to the client. As an example, if some MapState were made queryable, such 
> a transform might look up a specific key in the map and return its 
> corresponding value, resulting in only that value being returned to the 
> client instead of the entire map. This could be useful in cases where the 
> client only wants a portion of the state and the state is large (this is my 
> use case).
> At a high level, I think this could be accomplished by adding an (optional) 
> serializable Function into KvStateRequest (and related 
> classes?) and having that transform be applied in the QueryableStateServer 
> (or QueryableStateClientProxy?). I expect some additional TypeInformation 
> would also have to be supplied/used in places. It should be doable in a 
> backwards compatible way such that if the client does not specify a transform 
> it works exactly as it does now.
> Would there be any interested in a PR for this? This would help me for 
> something I'm currently working on and I'd be willing to take a crack at it. 
> If there is interest, I'll be happy to do some more research to come up with 
> a more concrete proposal.
> Thanks for Flink - it's great!
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10954) Hardlink from files of previous local stored state might cross devices

2019-02-11 Thread Tzu-Li (Gordon) Tai (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10954?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-10954:

Fix Version/s: (was: 1.7.2)
   1.7.3

> Hardlink from files of previous local stored state might cross devices
> --
>
> Key: FLINK-10954
> URL: https://issues.apache.org/jira/browse/FLINK-10954
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.6.2
>Reporter: Yun Tang
>Assignee: Yun Tang
>Priority: Major
> Fix For: 1.6.4, 1.7.3, 1.8.0
>
>
> Currently, local recovery's base directories is initialized from 
> '{{io.tmp.dirs}}' if parameter '{{taskmanager.state.local.root-dirs}}' is not 
> set. For Yarn environment, the tmp dirs is replaced by its '{{LOCAL_DIRS}}', 
> which might consist of directories from different devices, such as 
> /dump/1/nm-local-dir, /dump/2/nm-local-dir. The local directory for RocksDB 
> is initialized from IOManager's spillingDirectories, which might located in 
> different device from local recovery's folder. However, hard-link between 
> different devices is not allowed, it will throw exception below:
> {code:java}
> java.nio.file.FileSystemException: target -> souce: Invalid cross-device link
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255512747
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
 ##
 @@ -111,7 +112,8 @@ public void testScheduleImmediately() throws Exception {
 
assertEquals(5, 
testingSlotProvider.getNumberOfAvailableSlots());
}
-   
+
+   @Ignore
 
 Review comment:
   Yes, it could still make sense to test, even without the concurrency aspect.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255510916
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java
 ##
 @@ -109,16 +111,15 @@ public void testPayloadAssignmentAfterRelease() {
assertThat(singleLogicalSlot.tryAssignPayload(dummyPayload), 
is(false));
}
 
+   @Ignore
 
 Review comment:
   I think it can be removed because it makes no longer sense to test because 
`returnLogicalSlot` is no longer returning a future.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-02-11 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r255508677
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Non-spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength,
+   isA(IndexOutOfBoundsException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning1() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength - 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning2() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, spilling, deserialization reads one byte too many.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanningLargeRecord() throws 
Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingTooMuch.getRandom(),
+   32 * 1024,
+   isA(EOFException.class));
+   }
+
+   /**
+* Non-spanning, deserialization forgets to read one byte - failure 
report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughNonSpanning() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Spanning, deserialization forgets to read one byte - failure report 
comes from an additional
+* check in {@link SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   (serializedLength) -> serializedLength - 1);
+   }
+
+   /**
+* Spanning, serialization length is 17 (including headers), 
deserialization forgets to read one
+* byte - failure report comes from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   1);
+   }
+
+   /**
+* Spanning, spilling, deserialization forgets to read one byte - 
failure report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanningLargeRecord() 
throws Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingNotEnough.getRandom(),
+   32 * 1024);
+   }
+
+   private void testHandleWrongDeserialization(
+   WrongDeserializationValue testValue,
+   IntFunction segmentSizeProvider,
+   Matcher expectedCause) throws 
Exception {
+   expectedException.expectCause(expectedCause);
+   testHandleWrongDeserialization(testValue, 

[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255505919
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 ##
 @@ -97,34 +94,30 @@
 
private TestingResourceManagerGateway resourceManagerGateway;
 
+   private ComponentMainThreadExecutor mainThreadExecutor =
+   
TestingComponentMainThreadExecutorServiceAdapter.forMainThread();
 
 Review comment:
   It was already not concurrent anymore, so yes we can simplify.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255502651
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.java
 ##
 @@ -69,33 +62,18 @@
 import static org.junit.Assert.fail;
 
 /**
- * Tests for the SlotPool using a proper RPC setup.
+ * Tests for the SlotPoolImpl interactions.
  */
-public class SlotPoolRpcTest extends TestLogger {
-
-   private static RpcService rpcService;
-
-   private static final Time timeout = Time.seconds(10L);
+public class SlotPoolInteractionsTest extends TestLogger {
 
 Review comment:
   I think it still somewhat required because the test involve timeouts, and 
the simple direct main thread executor will always run into a violation if it 
is entered through the scheduled executor of the timeout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] NicoK commented on a change in pull request #6705: [FLINK-10356][network] add sanity checks to SpillingAdaptiveSpanningRecordDeserializer

2019-02-11 Thread GitBox
NicoK commented on a change in pull request #6705: [FLINK-10356][network] add 
sanity checks to SpillingAdaptiveSpanningRecordDeserializer
URL: https://github.com/apache/flink/pull/6705#discussion_r252367679
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 ##
 @@ -105,11 +119,235 @@ public void testHandleMixedLargeRecords() throws 
Exception {
testSerializationRoundTrip(originalRecords, segmentSize);
}
 
+   /**
+* Non-spanning, deserialization reads one byte too many and succeeds - 
failure report comes
+* from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Non-spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchNonSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength,
+   isA(IndexOutOfBoundsException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning1() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> serializedLength - 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, deserialization reads one byte too many and fails.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanning2() throws Exception {
+   testHandleWrongDeserialization(
+   DeserializingTooMuch.getValue(),
+   (serializedLength) -> 1,
+   isA(EOFException.class));
+   }
+
+   /**
+* Spanning, spilling, deserialization reads one byte too many.
+*/
+   @Test
+   public void testHandleDeserializingTooMuchSpanningLargeRecord() throws 
Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingTooMuch.getRandom(),
+   32 * 1024,
+   isA(EOFException.class));
+   }
+
+   /**
+* Non-spanning, deserialization forgets to read one byte - failure 
report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughNonSpanning() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   32 * 1024);
+   }
+
+   /**
+* Spanning, deserialization forgets to read one byte - failure report 
comes from an additional
+* check in {@link SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning1() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   (serializedLength) -> serializedLength - 1);
+   }
+
+   /**
+* Spanning, serialization length is 17 (including headers), 
deserialization forgets to read one
+* byte - failure report comes from an additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanning2() throws 
Exception {
+   testHandleWrongDeserialization(
+   DeserializingNotEnough.getValue(),
+   1);
+   }
+
+   /**
+* Spanning, spilling, deserialization forgets to read one byte - 
failure report comes from an
+* additional check in {@link 
SpillingAdaptiveSpanningRecordDeserializer}.
+*/
+   @Test
+   public void testHandleDeserializingNotEnoughSpanningLargeRecord() 
throws Exception {
+   testHandleWrongDeserialization(
+   LargeObjectTypeDeserializingNotEnough.getRandom(),
+   32 * 1024);
+   }
+
+   private void testHandleWrongDeserialization(
+   WrongDeserializationValue testValue,
+   IntFunction segmentSizeProvider,
+   Matcher expectedCause) throws 
Exception {
+   expectedException.expectCause(expectedCause);
+   testHandleWrongDeserialization(testValue, 

[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255498671
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolResource.java
 ##
 @@ -19,54 +19,55 @@
 package org.apache.flink.runtime.jobmaster.slotpool;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
-import org.apache.flink.runtime.rpc.RpcService;
 
 import org.junit.rules.ExternalResource;
 
 import javax.annotation.Nonnull;
 
+import java.util.HashMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 /**
- * {@link ExternalResource} which provides a {@link SlotPool}.
+ * {@link ExternalResource} which provides a {@link SlotPoolImpl}.
  */
 public class SlotPoolResource extends ExternalResource {
 
 Review comment:
   We could, but it is still rather convenient to have it in tests. I would not 
drop it only for that purpose because it still feels like it saves you code 
lines in testing.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11529) Support Chinese Documents for Apache Flink

2019-02-11 Thread Jark Wu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-11529:
---

Assignee: Jark Wu

> Support Chinese Documents for Apache Flink
> --
>
> Key: FLINK-11529
> URL: https://issues.apache.org/jira/browse/FLINK-11529
> Project: Flink
>  Issue Type: New Feature
>  Components: chinese-translation, Documentation
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
>
> This issue is an umbrella issue for tracking fully support Chinese for Flink 
> documents (http://ci.apache.org/projects/flink/flink-docs-master/).
> A more detailed description can be found in the proposal doc: 
> https://docs.google.com/document/d/1R1-uDq-KawLB8afQYrczfcoQHjjIhq6tvUksxrfhBl0/edit#



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-4330) Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the DataStream API

2019-02-11 Thread Valeria Vasylieva (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeria Vasylieva updated FLINK-4330:
-
Attachment: (was: 1.url)

> Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the 
> DataStream API
> ---
>
> Key: FLINK-4330
> URL: https://issues.apache.org/jira/browse/FLINK-4330
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Robert Metzger
>Priority: Major
> Fix For: 2.0.0
>
>
> I think we should consider removing the min()/minBy()/max()/maxBy()/sum() 
> utility methods from the DataStream API. They make the maintenance of the 
> code unnecessary complicated, and don't add enough value for the users (who 
> can not access the window metadata).
> If we are keeping the methods, we should consolidate the min/minBy methods: 
> the difference is subtle, and minBy can subsume the min method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-4330) Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the DataStream API

2019-02-11 Thread Valeria Vasylieva (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-4330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valeria Vasylieva updated FLINK-4330:
-
Attachment: 1.url

> Consider removing min()/minBy()/max()/maxBy()/sum() utility methods from the 
> DataStream API
> ---
>
> Key: FLINK-4330
> URL: https://issues.apache.org/jira/browse/FLINK-4330
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Reporter: Robert Metzger
>Priority: Major
> Fix For: 2.0.0
>
>
> I think we should consider removing the min()/minBy()/max()/maxBy()/sum() 
> utility methods from the DataStream API. They make the maintenance of the 
> code unnecessary complicated, and don't add enough value for the users (who 
> can not access the window metadata).
> If we are keeping the methods, we should consolidate the min/minBy methods: 
> the difference is subtle, and minBy can subsume the min method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on a change in pull request #7662: [FLINK-10431] Extraction of scheduling-related code from SlotPool into preliminary Scheduler (rebased)

2019-02-11 Thread GitBox
StefanRRichter commented on a change in pull request #7662: [FLINK-10431] 
Extraction of scheduling-related code from SlotPool into preliminary Scheduler 
(rebased)
URL: https://github.com/apache/flink/pull/7662#discussion_r255495062
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 ##
 @@ -66,11 +74,13 @@ public void testExecutionGraphRestartTimeMetric() throws 
JobException, IOExcepti
jobVertex.setInvokableClass(NoOpInvokable.class);
JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
 
+   Scheduler scheduler = mock(Scheduler.class);
 
 Review comment:
   That implementation is already there and used, so this line can just be 
deleted. It is leftover from the rebase.t.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   >