[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283159#comment-16283159
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5134
  
@zentol putting this code in external repository would make those 
benchmarks very fragile, since they depend on Flink internals. Almost any 
refactor made in Flink's network stack would brake the external repository (of 
which most committers are not even aware of) and would require coordinated 
commits, which would be unmaintainable.

@greghogan hmmm, that's a very tempting option, however I'm not the one 
that must be convinced that such option is legally viable.  


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-07 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5134
  
@zentol putting this code in external repository would make those 
benchmarks very fragile, since they depend on Flink internals. Almost any 
refactor made in Flink's network stack would brake the external repository (of 
which most committers are not even aware of) and would require coordinated 
commits, which would be unmaintainable.

@greghogan hmmm, that's a very tempting option, however I'm not the one 
that must be convinced that such option is legally viable.  


---


[GitHub] flink issue #5112: [FLINK-8175] remove flink-streaming-contrib and migrate i...

2017-12-07 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5112
  
Hi @tillrohrmann , per our discussion, I sent out an email to dev@flink, 
but seems it didn't catch any attention. What do you think about this PR?


---


[jira] [Commented] (FLINK-8175) remove flink-streaming-contrib and migrate its classes to flink-streaming-java/scala

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283113#comment-16283113
 ] 

ASF GitHub Bot commented on FLINK-8175:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5112
  
Hi @tillrohrmann , per our discussion, I sent out an email to dev@flink, 
but seems it didn't catch any attention. What do you think about this PR?


> remove flink-streaming-contrib and migrate its classes to 
> flink-streaming-java/scala
> 
>
> Key: FLINK-8175
> URL: https://issues.apache.org/jira/browse/FLINK-8175
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.5.0
>
>
> I propose removing flink-streaming-contrib from flink-contrib, and migrating 
> its classes to flink-streaming-java/scala for the following reasons:
> - flink-streaming-contrib is so small that it only has 4 classes (3 java and 
> 1 scala), and they don't need a dedicated jar for Flink to distribute and 
> maintain it and for users to deal with the overhead of dependency management
> - the 4 classes in flink-streaming-contrib are logically more tied to 
> flink-streaming-java/scala, and thus can be easily migrated
> - flink-contrib is already too crowded and noisy. It contains lots of sub 
> modules with different purposes which confuse developers and users, and they 
> lack a proper project hierarchy
> To take a step even forward, I would argue that even flink-contrib should be 
> removed and all its submodules should be migrated to other top-level modules 
> for the following reasons: 1) Apache Flink the whole project itself is a 
> result of contributions from many developers, there's no reason to highlight 
> some contributions in a dedicated module named 'contrib' 2) flink-contrib 
> inherently doesn't have a good hierarchy to hold submodules



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5138: [FLINK-8192] Properly annotate APIs of flink-conne...

2017-12-07 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5138

[FLINK-8192] Properly annotate APIs of flink-connector-kinesis

## What is the purpose of the change

Properly annotate classes in flink-connector-kinesis

## Brief change log

Properly annotate classes in flink-connector-kinesis

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8192

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5138


commit f1e9f8ac1323eaae2aa2a028ff1d03d7a2fbf658
Author: Bowen Li 
Date:   2017-12-07T08:02:16Z

annotate classes

commit e919aae934a0e9d7b65bb877acb64231e3b17120
Author: Bowen Li 
Date:   2017-12-08T06:12:23Z

modify annotation

commit f5d68ddba2ea0fb65bff9b300c5347cd8f7c35f5
Author: Bowen Li 
Date:   2017-12-08T06:16:34Z

modify annotations




---


[jira] [Commented] (FLINK-8192) Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving / @Internal

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16283108#comment-16283108
 ] 

ASF GitHub Bot commented on FLINK-8192:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5138

[FLINK-8192] Properly annotate APIs of flink-connector-kinesis

## What is the purpose of the change

Properly annotate classes in flink-connector-kinesis

## Brief change log

Properly annotate classes in flink-connector-kinesis

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

none

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8192

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5138


commit f1e9f8ac1323eaae2aa2a028ff1d03d7a2fbf658
Author: Bowen Li 
Date:   2017-12-07T08:02:16Z

annotate classes

commit e919aae934a0e9d7b65bb877acb64231e3b17120
Author: Bowen Li 
Date:   2017-12-08T06:12:23Z

modify annotation

commit f5d68ddba2ea0fb65bff9b300c5347cd8f7c35f5
Author: Bowen Li 
Date:   2017-12-08T06:16:34Z

modify annotations




> Properly annotate APIs of all Flink connectors with @Public / @PublicEvolving 
> / @Internal
> -
>
> Key: FLINK-8192
> URL: https://issues.apache.org/jira/browse/FLINK-8192
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Tzu-Li (Gordon) Tai
> Fix For: 1.5.0
>
>
> Currently, the APIs of the Flink connectors have absolutely no annotations on 
> whether their usage is {{Public}} / {{PublicEvolving}} / or {{Internal}}.
> We have, for example, instances in the past where a user was mistakenly using 
> an abstract internal base class in the Elasticsearch connector.
> This JIRA tracks the coverage of API usage annotation for all Flink shipped 
> connectors. Ideally, a separate subtask should be created for each individual 
> connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4991: [FLINK-7928] [runtime] extend the resources in Res...

2017-12-07 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4991#discussion_r155708446
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 ---
@@ -61,16 +80,26 @@
 * @param heapMemoryInMB The size of the heap memory, in megabytes.
 * @param directMemoryInMB The size of the direct memory, in megabytes.
 * @param nativeMemoryInMB The size of the native memory, in megabytes.
+* @param memoryForInputInMB The size of the memory for input, in 
megabytes.
+* @param memoryForOutputInMB The size of the memory for output, in 
megabytes.
 */
public ResourceProfile(
double cpuCores,
int heapMemoryInMB,
int directMemoryInMB,
-   int nativeMemoryInMB) {
+   int nativeMemoryInMB,
+   int memoryForInputInMB,
+   int memoryForOutputInMB,
--- End diff --

I think resource spec contains the resource user need to run their code, 
while resource profile contains the resource for running a task. So resource 
profile should also contain the part of resource used by flink system. We 
divide these part of resource to memoryForInputInMB and memoryForOutputInMB, 
and separate them from heap memory and  direct memory so as to different 
resource managers can choose different strategies. For example, per job 
resource manager need all these resource when allocating a task manager. but 
session manager may not consider the memoryForInputInMB and memoryForOutputInMB 
when assign a slot, as these part is decide when the session cluster is 
created. Do you think it make sense?


---


[GitHub] flink issue #4911: [FLINK-7878] [api] make resource type extendible in Resou...

2017-12-07 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann , I agree with you that adding a build looks better, I 
changed it according to your comments. Do you think it works now?


---


[jira] [Commented] (FLINK-7878) Extend the resource type user can define in ResourceSpec

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282937#comment-16282937
 ] 

ASF GitHub Bot commented on FLINK-7878:
---

Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/4911
  
@tillrohrmann , I agree with you that adding a build looks better, I 
changed it according to your comments. Do you think it works now?


> Extend the resource type user can define in ResourceSpec
> 
>
> Key: FLINK-7878
> URL: https://issues.apache.org/jira/browse/FLINK-7878
> Project: Flink
>  Issue Type: Improvement
>  Components: DataSet API, DataStream API
>Reporter: shuai.xu
>Assignee: shuai.xu
>  Labels: flip-6
>
> Now, flink only support user define how much CPU and MEM used in an operator, 
> but now the resource in a cluster is various. For example, an application for 
> image processing may need GPU, some others may need FPGA. 
> Only CPU and MEM is not enough, and the resource type is becoming more and 
> more, so we need to make the ResourSpec extendible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5133: [hotfix] Fix typo in AkkaUtils method

2017-12-07 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5133
  
[ERROR] 
src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java:[41,8] 
(imports) UnusedImports: Unused import: java.net.InetSocketAddress.


---


[GitHub] flink issue #5133: [hotfix] Fix typo in AkkaUtils method

2017-12-07 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5133
  
@casidiablo please also describe why or how the removed code came to be 
unused. I see that `StandaloneHaServices#RESOURCE_MANAGER_RPC_ENDPOINT_NAME` 
was left unused by `433a345e`.


---


[jira] [Commented] (FLINK-8215) Collections codegen exception when constructing Array or Map via SQL API

2017-12-07 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282799#comment-16282799
 ] 

Rong Rong commented on FLINK-8215:
--

Agree. as long as the ValidationException is thrown for TableAPI, we could go 
ahead and make the codegen to support type widening by adding in better type 
cast. Will go with option #2 then.

> Collections codegen exception when constructing Array or Map via SQL API
> 
>
> Key: FLINK-8215
> URL: https://issues.apache.org/jira/browse/FLINK-8215
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>
> TableAPI goes through `LogicalNode.validate()`, which brings up the 
> collection validation and rejects inconsistent type, this will throw 
> `ValidationExcpetion` for something like `array(1.0, 2.0f)`.
> SqlAPI uses `FlinkPlannerImpl.validator(SqlNode)`, which uses calcite SqlNode 
> validation, which supports resolving leastRestrictive type. `ARRAY[CAST(1 AS 
> DOUBLE), CAST(2 AS FLOAT)]` throws codegen exception.
> Root cause is the CodeGeneration for these collection value constructors does 
> not cast or resolve leastRestrictive type correctly. I see 2 options:
> 1. Strengthen validation to not allow resolving leastRestrictive type on SQL.
> 2. Making codegen support leastRestrictive type cast, such as using 
> `generateCast` instead of direct casting like `(ClassType) element`.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-07 Thread casidiablo
Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5134#discussion_r155667757
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+private long maxLatency = Long.MIN_VALUE;
+private long minLatency = Long.MAX_VALUE;
+private long sumLatency;
+private long sumLatencySquare;
+private int numSamples;
+
+private final MutableRecordReader reader;
+
+public SerializingLongReceiver(InputGate inputGate, int 
expectedRepetitionsOfExpectedRecord) {
+super(expectedRepetitionsOfExpectedRecord);
+this.reader = new MutableRecordReader<>(
+inputGate,
+new String[]{
+EnvironmentInformation.getTemporaryFileDirectory()
+});
+}
+
+protected void readRecords(long lastExpectedRecord) throws Exception {
--- End diff --

Add `@Override`?


---


[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282687#comment-16282687
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user casidiablo commented on a diff in the pull request:

https://github.com/apache/flink/pull/5134#discussion_r155667757
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/benchmark/SerializingLongReceiver.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.benchmark;
+
+import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.types.LongValue;
+
+/**
+ * {@link ReceiverThread} that deserialize incoming messages.
+ */
+public class SerializingLongReceiver extends ReceiverThread {
+
+private long maxLatency = Long.MIN_VALUE;
+private long minLatency = Long.MAX_VALUE;
+private long sumLatency;
+private long sumLatencySquare;
+private int numSamples;
+
+private final MutableRecordReader reader;
+
+public SerializingLongReceiver(InputGate inputGate, int 
expectedRepetitionsOfExpectedRecord) {
+super(expectedRepetitionsOfExpectedRecord);
+this.reader = new MutableRecordReader<>(
+inputGate,
+new String[]{
+EnvironmentInformation.getTemporaryFileDirectory()
+});
+}
+
+protected void readRecords(long lastExpectedRecord) throws Exception {
--- End diff --

Add `@Override`?


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4

2017-12-07 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5072
  
@yew1eb have you looked at FLINK-6965?


---


[jira] [Commented] (FLINK-8223) Update Hadoop versions

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282572#comment-16282572
 ] 

ASF GitHub Bot commented on FLINK-8223:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5137

[FLINK-8223] [build] Update Hadoop versions

## What is the purpose of the change

Update Hadoop minor versions for Flink 1.5 development cycle.

## Brief change log

Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See 
http://hadoop.apache.org/releases.html

## Verifying this change

This change is already covered by existing tests, such as Hadoop tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 8223_update_hadoop_versions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5137


commit 34b58f6ce8b66ea8f3096a4792ba108b8df4dbcf
Author: Greg Hogan 
Date:   2017-12-07T18:29:29Z

[FLINK-8223] [build] Update Hadoop versions




> Update Hadoop versions
> --
>
> Key: FLINK-8223
> URL: https://issues.apache.org/jira/browse/FLINK-8223
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>Priority: Trivial
>
> Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See 
> http://hadoop.apache.org/releases.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5137: [FLINK-8223] [build] Update Hadoop versions

2017-12-07 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5137

[FLINK-8223] [build] Update Hadoop versions

## What is the purpose of the change

Update Hadoop minor versions for Flink 1.5 development cycle.

## Brief change log

Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See 
http://hadoop.apache.org/releases.html

## Verifying this change

This change is already covered by existing tests, such as Hadoop tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 8223_update_hadoop_versions

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5137.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5137


commit 34b58f6ce8b66ea8f3096a4792ba108b8df4dbcf
Author: Greg Hogan 
Date:   2017-12-07T18:29:29Z

[FLINK-8223] [build] Update Hadoop versions




---


[jira] [Commented] (FLINK-8222) Update Scala version

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282490#comment-16282490
 ] 

ASF GitHub Bot commented on FLINK-8222:
---

GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5136

[FLINK-8222] [build] Update Scala version

## What is the purpose of the change

This is an incremental upgrade to the Scala security release 2.11.12.

"A privilege escalation vulnerability (CVE-2017-15288) has been identified 
in the Scala compilation daemon."

https://www.scala-lang.org/news/security-update-nov17.html

## Brief change log

Updated scala version in both parent `pom.xml` and in 
flink-quickstart-scala `pom.xml`.

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 8222_update_scala_version

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5136


commit 7136e7f2def4c2d5a694729e3aefc1e4f54dfa22
Author: Greg Hogan 
Date:   2017-12-07T18:22:00Z

[FLINK-8222] [build] Update Scala version

This is an incremental upgrade to the Scala security release 2.11.12.




> Update Scala version
> 
>
> Key: FLINK-8222
> URL: https://issues.apache.org/jira/browse/FLINK-8222
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Greg Hogan
>Assignee: Greg Hogan
>
> Update Scala to version {{2.11.12}}. I don't believe this affects the Flink 
> distribution but rather anyone who is compiling Flink or a 
> Flink-quickstart-derived program on a shared system.
> "A privilege escalation vulnerability (CVE-2017-15288) has been identified in 
> the Scala compilation daemon."
> https://www.scala-lang.org/news/security-update-nov17.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5136: [FLINK-8222] [build] Update Scala version

2017-12-07 Thread greghogan
GitHub user greghogan opened a pull request:

https://github.com/apache/flink/pull/5136

[FLINK-8222] [build] Update Scala version

## What is the purpose of the change

This is an incremental upgrade to the Scala security release 2.11.12.

"A privilege escalation vulnerability (CVE-2017-15288) has been identified 
in the Scala compilation daemon."

https://www.scala-lang.org/news/security-update-nov17.html

## Brief change log

Updated scala version in both parent `pom.xml` and in 
flink-quickstart-scala `pom.xml`.

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/greghogan/flink 8222_update_scala_version

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5136


commit 7136e7f2def4c2d5a694729e3aefc1e4f54dfa22
Author: Greg Hogan 
Date:   2017-12-07T18:22:00Z

[FLINK-8222] [build] Update Scala version

This is an incremental upgrade to the Scala security release 2.11.12.




---


[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282444#comment-16282444
 ] 

ASF GitHub Bot commented on FLINK-8080:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155618187
  
--- Diff: docs/monitoring/metrics.md ---
@@ -329,11 +329,11 @@ or by assigning unique names to jobs and operators.
 Metrics can be exposed to an external system by configuring one or several 
reporters in `conf/flink-conf.yaml`. These
 reporters will be instantiated on each job and task manager when they are 
started.
 
-- `metrics.reporters`: The list of named reporters.
 - `metrics.reporter..`: Generic setting `` for the 
reporter named ``.
 - `metrics.reporter..class`: The reporter class to use for the 
reporter named ``.
 - `metrics.reporter..interval`: The reporter interval to use for the 
reporter named ``.
 - `metrics.reporter..scope.delimiter`: The delimiter to use for the 
identifier (default value use `metrics.scope.delimiter`) for the reporter named 
``.
+- `metrics.reporters`: (optional) An include list for reporters to 
instantiate. By default all configured reporters will be used.
--- End diff --

"An include list for reporters" -> "A comma-separated list of reporter 
names"? Not sure if we need to specify "comma-separated".


> Remove need for "metrics.reporters"
> ---
>
> Key: FLINK-8080
> URL: https://issues.apache.org/jira/browse/FLINK-8080
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Metrics
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Currently, in order to use a reporter one must configure something like this:
> {code}
> metrics.reporters: jmx
> metrics.reporter.jmx.class: ...
> {code}
> It would be neat if users did not have to set {{metrics.reporters}}. We can 
> accomplish this by a scanning the configuration for configuration keys 
> starting with {{metrics.reporter.}} and using the next word as a reporter 
> name.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...

2017-12-07 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155625966
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 ---
@@ -76,8 +76,27 @@ public void testIsShutdown() {
public void testReporterInstantiation() {
Configuration config = new Configuration();
 
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+
+   MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+   assertTrue(metricRegistry.getReporters().size() == 1);
+
+   Assert.assertTrue(TestReporter1.wasOpened);
+
+   metricRegistry.shutdown();
+   }
+
+   /**
+* Verifies that the reporter name list is correctly used to determine 
which reporters should be instantiated.
+*/
+   @Test
+   public void testReporterInclusion() {
+   Configuration config = new Configuration();
+
config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
--- End diff --

Use a `TestReporter2` and verify not opened? Do we need both 
`testReporterInstantiation` and `testReporterInclusion`?


---


[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...

2017-12-07 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155620220
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 ---
@@ -108,15 +118,36 @@ public static MetricRegistryConfiguration 
fromConfiguration(Configuration config
delim = '.';
}
 
-   final String definedReporters = 
configuration.getString(MetricOptions.REPORTERS_LIST);
+   Set includedReporters = 
reporterListPattern.splitAsStream(configuration.getString(MetricOptions.REPORTERS_LIST,
 ""))
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedReporters = new TreeSet<>(String::compareTo);
+   // scan entire configuration for "metric.reporter" keys and 
parse individual reporter configurations
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}.", reporterName);
--- End diff --

Log the reason for excluding the reporter (not in the reporters list)?


---


[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282443#comment-16282443
 ] 

ASF GitHub Bot commented on FLINK-8080:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155623170
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 ---
@@ -44,7 +48,13 @@
private static volatile MetricRegistryConfiguration 
defaultConfiguration;
 
// regex pattern to split the defined reporters
-   private static final Pattern splitPattern = 
Pattern.compile("\\s*,\\s*");
+   private static final Pattern reporterListPattern = 
Pattern.compile("\\s*,\\s*");
+
+   // regex pattern to extract the name from reporter configuration keys, 
e.g. "rep" from "metrics.reporter.rep.class"
+   private static final Pattern reporterClassPattern = Pattern.compile(
+   Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
+   "([\\S&&[^.]]*)\\." +
--- End diff --

It would be helpful to document that we are intersecting regex character 
classes.


> Remove need for "metrics.reporters"
> ---
>
> Key: FLINK-8080
> URL: https://issues.apache.org/jira/browse/FLINK-8080
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Metrics
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Currently, in order to use a reporter one must configure something like this:
> {code}
> metrics.reporters: jmx
> metrics.reporter.jmx.class: ...
> {code}
> It would be neat if users did not have to set {{metrics.reporters}}. We can 
> accomplish this by a scanning the configuration for configuration keys 
> starting with {{metrics.reporter.}} and using the next word as a reporter 
> name.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5135: [hotfix] [doc] Fix typo in TaskManager and Environ...

2017-12-07 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5135#discussion_r155627163
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -585,7 +585,7 @@ class TaskManager(
   config.getMaxRegistrationPause().toMilliseconds,
   TimeUnit.MILLISECONDS))
 
-// schedule (with our timeout s delay) a check triggers a new 
registration
+// schedule (with our timeout's delay) a check triggers a new 
registration
--- End diff --

"triggers" -> "to trigger"? Or rewrite like "schedule a check to trigger a 
new registration attempt if not registered by the timeout"?


---


[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282466#comment-16282466
 ] 

ASF GitHub Bot commented on FLINK-7984:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5072
  
@yew1eb have you looked at FLINK-6965?


> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282445#comment-16282445
 ] 

ASF GitHub Bot commented on FLINK-8080:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155620220
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 ---
@@ -108,15 +118,36 @@ public static MetricRegistryConfiguration 
fromConfiguration(Configuration config
delim = '.';
}
 
-   final String definedReporters = 
configuration.getString(MetricOptions.REPORTERS_LIST);
+   Set includedReporters = 
reporterListPattern.splitAsStream(configuration.getString(MetricOptions.REPORTERS_LIST,
 ""))
+   .collect(Collectors.toSet());
+
+   // use a TreeSet to make the reporter order deterministic, 
which is useful for testing
+   Set namedReporters = new TreeSet<>(String::compareTo);
+   // scan entire configuration for "metric.reporter" keys and 
parse individual reporter configurations
+   for (String key : configuration.keySet()) {
+   if 
(key.startsWith(ConfigConstants.METRICS_REPORTER_PREFIX)) {
+   Matcher matcher = 
reporterClassPattern.matcher(key);
+   if (matcher.matches()) {
+   String reporterName = matcher.group(1);
+   if (includedReporters.isEmpty() || 
includedReporters.contains(reporterName)) {
+   if 
(namedReporters.contains(reporterName)) {
+   LOG.warn("Duplicate 
class configuration detected for reporter {}.", reporterName);
+   } else {
+   
namedReporters.add(reporterName);
+   }
+   } else {
+   LOG.info("Excluding reporter 
{}.", reporterName);
--- End diff --

Log the reason for excluding the reporter (not in the reporters list)?


> Remove need for "metrics.reporters"
> ---
>
> Key: FLINK-8080
> URL: https://issues.apache.org/jira/browse/FLINK-8080
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Metrics
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Currently, in order to use a reporter one must configure something like this:
> {code}
> metrics.reporters: jmx
> metrics.reporter.jmx.class: ...
> {code}
> It would be neat if users did not have to set {{metrics.reporters}}. We can 
> accomplish this by a scanning the configuration for configuration keys 
> starting with {{metrics.reporter.}} and using the next word as a reporter 
> name.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8080) Remove need for "metrics.reporters"

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282446#comment-16282446
 ] 

ASF GitHub Bot commented on FLINK-8080:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155625966
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
 ---
@@ -76,8 +76,27 @@ public void testIsShutdown() {
public void testReporterInstantiation() {
Configuration config = new Configuration();
 
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+
+   MetricRegistryImpl metricRegistry = new 
MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+   assertTrue(metricRegistry.getReporters().size() == 1);
+
+   Assert.assertTrue(TestReporter1.wasOpened);
+
+   metricRegistry.shutdown();
+   }
+
+   /**
+* Verifies that the reporter name list is correctly used to determine 
which reporters should be instantiated.
+*/
+   @Test
+   public void testReporterInclusion() {
+   Configuration config = new Configuration();
+
config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter1.class.getName());
--- End diff --

Use a `TestReporter2` and verify not opened? Do we need both 
`testReporterInstantiation` and `testReporterInclusion`?


> Remove need for "metrics.reporters"
> ---
>
> Key: FLINK-8080
> URL: https://issues.apache.org/jira/browse/FLINK-8080
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Metrics
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> Currently, in order to use a reporter one must configure something like this:
> {code}
> metrics.reporters: jmx
> metrics.reporter.jmx.class: ...
> {code}
> It would be neat if users did not have to set {{metrics.reporters}}. We can 
> accomplish this by a scanning the configuration for configuration keys 
> starting with {{metrics.reporter.}} and using the next word as a reporter 
> name.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...

2017-12-07 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155618187
  
--- Diff: docs/monitoring/metrics.md ---
@@ -329,11 +329,11 @@ or by assigning unique names to jobs and operators.
 Metrics can be exposed to an external system by configuring one or several 
reporters in `conf/flink-conf.yaml`. These
 reporters will be instantiated on each job and task manager when they are 
started.
 
-- `metrics.reporters`: The list of named reporters.
 - `metrics.reporter..`: Generic setting `` for the 
reporter named ``.
 - `metrics.reporter..class`: The reporter class to use for the 
reporter named ``.
 - `metrics.reporter..interval`: The reporter interval to use for the 
reporter named ``.
 - `metrics.reporter..scope.delimiter`: The delimiter to use for the 
identifier (default value use `metrics.scope.delimiter`) for the reporter named 
``.
+- `metrics.reporters`: (optional) An include list for reporters to 
instantiate. By default all configured reporters will be used.
--- End diff --

"An include list for reporters" -> "A comma-separated list of reporter 
names"? Not sure if we need to specify "comma-separated".


---


[GitHub] flink pull request #5099: [FLINK-8080][metrics] Remove need for "metrics.rep...

2017-12-07 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5099#discussion_r155623170
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryConfiguration.java
 ---
@@ -44,7 +48,13 @@
private static volatile MetricRegistryConfiguration 
defaultConfiguration;
 
// regex pattern to split the defined reporters
-   private static final Pattern splitPattern = 
Pattern.compile("\\s*,\\s*");
+   private static final Pattern reporterListPattern = 
Pattern.compile("\\s*,\\s*");
+
+   // regex pattern to extract the name from reporter configuration keys, 
e.g. "rep" from "metrics.reporter.rep.class"
+   private static final Pattern reporterClassPattern = Pattern.compile(
+   Pattern.quote(ConfigConstants.METRICS_REPORTER_PREFIX) +
+   "([\\S&&[^.]]*)\\." +
--- End diff --

It would be helpful to document that we are intersecting regex character 
classes.


---


[GitHub] flink pull request #5135: [hotfix] [doc] Fix typo in TaskManager and Environ...

2017-12-07 Thread casidiablo
GitHub user casidiablo opened a pull request:

https://github.com/apache/flink/pull/5135

[hotfix] [doc] Fix typo in TaskManager and EnvironmentInformation doc



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/casidiablo/flink hotfix/comments-typo-tm

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5135.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5135


commit 9f59d9bd9dea0f95aed788c1b2a5a20a995ca07c
Author: Cristian 
Date:   2017-12-07T19:00:00Z

[hotfix] [doc] Fix typo in TaskManager and EnvironmentInformation doc




---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155604499
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
--- End diff --

The variable name is confusing. `multiTaskSlotFuture` is not of type 
`Future`.


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282293#comment-16282293
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155604499
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
--- End diff --

The variable name is confusing. `multiTaskSlotFuture` is not of type 
`Future`.


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282300#comment-16282300
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155605251
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155605251
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A logical slot represents a resource on a TaskManager into
+ * which a single task can be deployed.
+ */
+public interface LogicalSlot {
+
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


---


[jira] [Created] (FLINK-8222) Update Scala version

2017-12-07 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-8222:
-

 Summary: Update Scala version
 Key: FLINK-8222
 URL: https://issues.apache.org/jira/browse/FLINK-8222
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.4.0
Reporter: Greg Hogan
Assignee: Greg Hogan


Update Scala to version {{2.11.12}}. I don't believe this affects the Flink 
distribution but rather anyone who is compiling Flink or a 
Flink-quickstart-derived program on a shared system.

"A privilege escalation vulnerability (CVE-2017-15288) has been identified in 
the Scala compilation daemon."

https://www.scala-lang.org/news/security-update-nov17.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8223) Update Hadoop versions

2017-12-07 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-8223:
-

 Summary: Update Hadoop versions
 Key: FLINK-8223
 URL: https://issues.apache.org/jira/browse/FLINK-8223
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Affects Versions: 1.5.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Trivial


Update 2.7.3 to 2.7.4 and 2.8.0 to 2.8.2. See 
http://hadoop.apache.org/releases.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155590317
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
--- End diff --

nit: variable name should be *leaf* 

https://www.dict.cc/?s=leaf


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282216#comment-16282216
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155590317
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 ---
@@ -266,104 +279,367 @@ public void disconnectResourceManager() {
// 

 
@Override
-   public CompletableFuture allocateSlot(
-   SlotRequestID requestId,
-   ScheduledUnit task,
-   ResourceProfile resources,
-   Iterable locationPreferences,
+   public CompletableFuture allocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit scheduledUnit,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling,
Time timeout) {
 
-   return internalAllocateSlot(requestId, task, resources, 
locationPreferences);
+   return internalAllocateSlot(
+   slotRequestId,
+   scheduledUnit,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
}
 
-   @Override
-   public void returnAllocatedSlot(Slot slot) {
-   internalReturnAllocatedSlot(slot);
+   private CompletableFuture internalAllocateSlot(
+   SlotRequestId slotRequestId,
+   ScheduledUnit task,
+   ResourceProfile resourceProfile,
+   Collection locationPreferences,
+   boolean allowQueuedScheduling) {
+
+   final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
+
+   if (slotSharingGroupId != null) {
+   // allocate slot with slot sharing
+   final SlotSharingManager multiTaskSlotManager = 
slotSharingManagers.computeIfAbsent(
+   slotSharingGroupId,
+   id -> new SlotSharingManager(
+   id,
+   this,
+   providerAndOwner));
+
+   final SlotSharingManager.MultiTaskSlotLocality 
multiTaskSlotFuture;
+
+   try {
+   if (task.getCoLocationConstraint() != null) {
+   multiTaskSlotFuture = 
allocateCoLocatedMultiTaskSlot(
+   task.getCoLocationConstraint(),
+   multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   } else {
+   multiTaskSlotFuture = 
allocateMultiTaskSlot(
+   task.getJobVertexId(), 
multiTaskSlotManager,
+   resourceProfile,
+   locationPreferences,
+   allowQueuedScheduling);
+   }
+   } catch (NoResourceAvailableException 
noResourceException) {
+   return 
FutureUtils.completedExceptionally(noResourceException);
+   }
+
+   // sanity check
+   
Preconditions.checkState(!multiTaskSlotFuture.getMultiTaskSlot().contains(task.getJobVertexId()));
+
+   final SlotSharingManager.SingleTaskSlot leave = 
multiTaskSlotFuture.getMultiTaskSlot().allocateSingleTaskSlot(
--- End diff --

nit: variable name should be *leaf* 

https://www.dict.cc/?s=leaf


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This 

[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282181#comment-16282181
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5134
  
Or if `flink-benchmarks` does not need to be distributed then that code 
could be contributed to the main Flink repository.


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5047: Code refine of WordWithCount

2017-12-07 Thread harborl
Github user harborl commented on the issue:

https://github.com/apache/flink/pull/5047
  
Yes, after a few learning, it turns out that, in this project, POJO is a 
sort of reasonable style of parameters. Sorry for interruption, I think I am 
exited about learning more further.


---


[jira] [Commented] (FLINK-7129) Dynamically changing patterns

2017-12-07 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282185#comment-16282185
 ] 

Fabian Hueske commented on FLINK-7129:
--

It won't go into Flink 1.4.0 because feature freeze was more than 4 weeks ago 
and the release is currently voted on. 
I don't know about plans for 1.5.0.

> Dynamically changing patterns
> -
>
> Key: FLINK-7129
> URL: https://issues.apache.org/jira/browse/FLINK-7129
> Project: Flink
>  Issue Type: New Feature
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>
> An umbrella task for introducing mechanism for injecting patterns through 
> coStream



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-07 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5134
  
Or if `flink-benchmarks` does not need to be distributed then that code 
could be contributed to the main Flink repository.


---


[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16282080#comment-16282080
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5134
  
Why do you want to add these to flink when they are only used by an 
external repository? Wouldn't it make sense to consolidate all benchmarking 
code to the flink-benchmark repo?


> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5134
  
Why do you want to add these to flink when they are only used by an 
external repository? Wouldn't it make sense to consolidate all benchmarking 
code to the flink-benchmark repo?


---


[jira] [Commented] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281998#comment-16281998
 ] 

ASF GitHub Bot commented on FLINK-8220:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5134

[FLINK-8220] Implement set of network benchmarks

This PR implements sets of network benchmarks that are intended to simulate 
various workloads for network stack. Benchmarks will be executed by 
`flink-benchmarks` project. We want to keep the code of the benchmarks here, 
since components they are using are not part of public api and we don't wont 
them braking unknowingly on changes in the flink code.

## Verifying this change

This change adds two tests ensuring that benchmarks are compiling and 
executing as expected without an exceptions.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink network-benchmarks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5134.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5134


commit 5151d74980fd5995c718173ea03db5049ed0ada2
Author: Piotr Nowojski 
Date:   2017-11-28T15:49:37Z

[FLINK-8172][network] Write to memorySegment directly in RecordSerializer

This increases throughput of network stack by factor of 2, because 
previously
method getMemorySegment() was called twice per record and it is a 
synchronized
method on recycleLock, while RecordSerializer is sole owner of the Buffer at
this point, so synchronisation is not needed.

commit 42a6a0daefdb9e9b1185a8195c28eccf055a83ce
Author: Piotr Nowojski 
Date:   2017-11-28T07:41:40Z

[hotfix][test] Add timeout for joining with CheckedThread

commit b777e4f44344f12d0196c77515aac0815e7f31c5
Author: Piotr Nowojski 
Date:   2017-11-28T07:42:19Z

[hotfix][util] Added suppressExceptions for lambda functions

commit 28e25a2bd0af704af1fa1b084be6b0cfa05d9928
Author: Piotr Nowojski 
Date:   2017-12-07T09:03:32Z

[FLINK-XXX][network-benchmarks] Define network benchmarks in Flink project

commit 4f88c6522e176bc20ae53c67136e53dc7970c9e0
Author: Nico Kruber 
Date:   2017-12-07T09:03:49Z

[FLINK-XXX][network-benchmarks] Define latency network benchmarks in Flink 
project




> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5134: [FLINK-8220] Implement set of network benchmarks

2017-12-07 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5134

[FLINK-8220] Implement set of network benchmarks

This PR implements sets of network benchmarks that are intended to simulate 
various workloads for network stack. Benchmarks will be executed by 
`flink-benchmarks` project. We want to keep the code of the benchmarks here, 
since components they are using are not part of public api and we don't wont 
them braking unknowingly on changes in the flink code.

## Verifying this change

This change adds two tests ensuring that benchmarks are compiling and 
executing as expected without an exceptions.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink network-benchmarks

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5134.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5134


commit 5151d74980fd5995c718173ea03db5049ed0ada2
Author: Piotr Nowojski 
Date:   2017-11-28T15:49:37Z

[FLINK-8172][network] Write to memorySegment directly in RecordSerializer

This increases throughput of network stack by factor of 2, because 
previously
method getMemorySegment() was called twice per record and it is a 
synchronized
method on recycleLock, while RecordSerializer is sole owner of the Buffer at
this point, so synchronisation is not needed.

commit 42a6a0daefdb9e9b1185a8195c28eccf055a83ce
Author: Piotr Nowojski 
Date:   2017-11-28T07:41:40Z

[hotfix][test] Add timeout for joining with CheckedThread

commit b777e4f44344f12d0196c77515aac0815e7f31c5
Author: Piotr Nowojski 
Date:   2017-11-28T07:42:19Z

[hotfix][util] Added suppressExceptions for lambda functions

commit 28e25a2bd0af704af1fa1b084be6b0cfa05d9928
Author: Piotr Nowojski 
Date:   2017-12-07T09:03:32Z

[FLINK-XXX][network-benchmarks] Define network benchmarks in Flink project

commit 4f88c6522e176bc20ae53c67136e53dc7970c9e0
Author: Nico Kruber 
Date:   2017-12-07T09:03:49Z

[FLINK-XXX][network-benchmarks] Define latency network benchmarks in Flink 
project




---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281986#comment-16281986
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155549755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -32,6 +34,20 @@
  */
 public interface LogicalSlot {
 
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155549755
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LogicalSlot.java 
---
@@ -32,6 +34,20 @@
  */
 public interface LogicalSlot {
 
+Payload TERMINATED_PAYLOAD = new Payload() {
+
+   private final CompletableFuture COMPLETED_TERMINATION_FUTURE 
= CompletableFuture.completedFuture(null);
--- End diff --

nit: `COMPLETED_TERMINATION_FUTURE` should be camel cased because is not 
actually a constant (not static).


---


[jira] [Created] (FLINK-8221) Implement set of network latency benchmarks in Flink

2017-12-07 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8221:
-

 Summary: Implement set of network latency benchmarks in Flink
 Key: FLINK-8221
 URL: https://issues.apache.org/jira/browse/FLINK-8221
 Project: Flink
  Issue Type: New Feature
  Components: Network
Reporter: Piotr Nowojski
Assignee: Nico Kruber






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8220) Implement set of network throughput benchmarks in Flink

2017-12-07 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8220:
--
Summary: Implement set of network throughput benchmarks in Flink  (was: 
Define set of network throughput benchmarks in Flink)

> Implement set of network throughput benchmarks in Flink
> ---
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8220) Define set of network throughput benchmarks in Flink

2017-12-07 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski updated FLINK-8220:
--
Description: 
Benchmarks should be defined and implemented in flink project and they will be 
executed in {{flink-benchmarks}} project.

Configurable parameters: number of record writers and number of channels.

> Define set of network throughput benchmarks in Flink
> 
>
> Key: FLINK-8220
> URL: https://issues.apache.org/jira/browse/FLINK-8220
> Project: Flink
>  Issue Type: New Feature
>  Components: Network
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Benchmarks should be defined and implemented in flink project and they will 
> be executed in {{flink-benchmarks}} project.
> Configurable parameters: number of record writers and number of channels.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8220) Define set of network throughput benchmarks in Flink

2017-12-07 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-8220:
-

 Summary: Define set of network throughput benchmarks in Flink
 Key: FLINK-8220
 URL: https://issues.apache.org/jira/browse/FLINK-8220
 Project: Flink
  Issue Type: New Feature
  Components: Network
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8078) Decouple Execution from actual slot implementation

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281918#comment-16281918
 ] 

ASF GitHub Bot commented on FLINK-8078:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5086
  
LGTM


> Decouple Execution from actual slot implementation
> --
>
> Key: FLINK-8078
> URL: https://issues.apache.org/jira/browse/FLINK-8078
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to plug in a different slot implementation, we should introduce a 
> slot interface which abstracts away the implementation details of 
> {{SimpleSlot}} wrt {{Execution}}. The reason this is necessary is to provide 
> a simpler slot implementation for Flip-6 since all allocation/release logic 
> will go through the {{SlotPool}}. Thus, we no longer need the concurrent 
> structure of {{Slot}}, {{SharedSlot}}, {{SimpleSlot}} and 
> {{SlotSharingGroupAssignment}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5086: [FLINK-8078] Introduce LogicalSlot interface

2017-12-07 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5086
  
LGTM


---


[jira] [Created] (FLINK-8219) Kinesis Connector metrics

2017-12-07 Thread Gary Oslon (JIRA)
Gary Oslon created FLINK-8219:
-

 Summary: Kinesis Connector metrics
 Key: FLINK-8219
 URL: https://issues.apache.org/jira/browse/FLINK-8219
 Project: Flink
  Issue Type: Bug
  Components: Kinesis Connector
Reporter: Gary Oslon


I don't see in the documentation which metrics are emitted by Kinesis 
Connector. When working with Amazon's Kinesis Client library, it is common to 
get metrics via {{MillisBehindLatest}} which tells you whether your processor 
is delayed.

Where are those metrics being published?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5133: [hotfix] Fix typo in AkkaUtils method

2017-12-07 Thread yew1eb
Github user yew1eb commented on the issue:

https://github.com/apache/flink/pull/5133
  
LGTM.


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281885#comment-16281885
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155520946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
--- End diff --

nit: All fields are commented with non-javadoc comments. Normally comments 
on fields are also done in Javadoc style, e.g., `SlotPool`. Javadoc comments on 
fields are displayed by IntelliJ (`Ctrl + J`).


> Add support for scheduling with slot sharing
> 
>
> Key: 

[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281883#comment-16281883
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155519870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the 

[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281884#comment-16281884
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155528224
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155519870
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155528224
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155520946
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
--- End diff --

nit: All fields are commented with non-javadoc comments. Normally comments 
on fields are also done in Javadoc style, e.g., `SlotPool`. Javadoc comments on 
fields are displayed by IntelliJ (`Ctrl + J`).


---


[jira] [Commented] (FLINK-8124) EventTimeTrigger (and other triggers) could have less specific generic types

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281876#comment-16281876
 ] 

ASF GitHub Bot commented on FLINK-8124:
---

Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5073
  
> I thought this was mainly about making the time triggers more relaxed so 
that they accept a Window in addition to only TimeWindow.

That's exactly what this PR is about.


> EventTimeTrigger (and other triggers) could have less specific generic types
> 
>
> Key: FLINK-8124
> URL: https://issues.apache.org/jira/browse/FLINK-8124
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Cristian
>Priority: Minor
>
> When implementing custom WindowAssigners, it is possible to need different 
> implementations of the {{Window}} class (other than {{TimeWindow}}). In such 
> cases, it is not possible to use the existing triggers (e.g. 
> {{EventTimeTrigger}}) because it extends from {{Trigger}} 
> which is (unnecessarily?) specific.
> It should be possible to make that class more generic by using 
> {{EventTimeTrigger extends Trigger}} instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5073: [FLINK-8124] Make Trigger implementations more generic

2017-12-07 Thread casidiablo
Github user casidiablo commented on the issue:

https://github.com/apache/flink/pull/5073
  
> I thought this was mainly about making the time triggers more relaxed so 
that they accept a Window in addition to only TimeWindow.

That's exactly what this PR is about.


---


[GitHub] flink pull request #5133: [hotfix] Fix typo in AkkaUtils method

2017-12-07 Thread casidiablo
GitHub user casidiablo opened a pull request:

https://github.com/apache/flink/pull/5133

[hotfix] Fix typo in AkkaUtils method

Also, removed unused code:

- `StandaloneHaServices#RESOURCE_MANAGER_RPC_ENDPOINT_NAME`
- AkkaRpcServiceUtils#createInetSocketAddressFromAkkaURL()

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/casidiablo/flink hotfix/fix-typo

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5133.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5133






---


[jira] [Commented] (FLINK-8122) Name all table sinks and sources

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281851#comment-16281851
 ] 

ASF GitHub Bot commented on FLINK-8122:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5068
  
Thanks for the review @fhueske.
* I added the `explainSink()` method to make the `TableSink` consistent 
with the `TableSource`. Discarding this method and using the 
`TableConnectorUtil.genRuntimeName()` in each table sink should also be OK.
* Yes, the test sources and sinks are not exposed. I'll revert them.
* The source name was set via this call 
`execEnv.createInput(orcIF).name(explainSource())`. The `createInput()` method 
for a **mocked** `execEnv` will return `null` and cause a NPE.

Thanks,
Xingcan


> Name all table sinks and sources
> 
>
> Key: FLINK-8122
> URL: https://issues.apache.org/jira/browse/FLINK-8122
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>
> Not all table sink and sources have proper names. Therefore, they are 
> displayed as "Unnamed" in the logs and Web UI (e.g. CsvTableSink). We should 
> add names for all built-in connectors. Having information about the table 
> sink name (via {{INSERT INTO}}) would be even better.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5068: [FLINK-8122] [table] Name all built-in table sinks and so...

2017-12-07 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5068
  
Thanks for the review @fhueske.
* I added the `explainSink()` method to make the `TableSink` consistent 
with the `TableSource`. Discarding this method and using the 
`TableConnectorUtil.genRuntimeName()` in each table sink should also be OK.
* Yes, the test sources and sinks are not exposed. I'll revert them.
* The source name was set via this call 
`execEnv.createInput(orcIF).name(explainSource())`. The `createInput()` method 
for a **mocked** `execEnv` will return `null` and cause a NPE.

Thanks,
Xingcan


---


[jira] [Commented] (FLINK-8093) flink job fail because of kafka producer create fail of "javax.management.InstanceAlreadyExistsException"

2017-12-07 Thread dongtingting (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281834#comment-16281834
 ] 

dongtingting commented on FLINK-8093:
-

[~aljoscha] I think clientid is thread safe and static. But one taskmanager may 
have multi slots, different slots use different environment and kafkaProducer 
classes. So one taskmanager may have multi  same clientid, but metrics will 
register sun.jmx.mbeanserver which is one in one jvm . Then multi same clientid 
conflict while register into one sun.jmx.mbeanserver. 

We fix this problem by user code set:
properties.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "producer-" + topic + 
timestamp);
This can avoid conflict。 

In addition we want to modify flink code to  avoid conflict further。 

> flink job fail because of kafka producer create fail of 
> "javax.management.InstanceAlreadyExistsException"
> -
>
> Key: FLINK-8093
> URL: https://issues.apache.org/jira/browse/FLINK-8093
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.3.2
> Environment: flink 1.3.2, kafka 0.9.1
>Reporter: dongtingting
>Priority: Critical
>
> one taskmanager has multiple taskslot, one task fail because of create 
> kafkaProducer fail,the reason for create kafkaProducer fail is 
> “javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3”。 the detail trace 
> is :
> 2017-11-04 19:41:23,281 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Source: Custom Source -> Filter -> Map -> Filter -> Sink: 
> dp_client_**_log (7/80) (99551f3f892232d7df5eb9060fa9940c) switched from 
> RUNNING to FAILED.
> org.apache.kafka.common.KafkaException: Failed to construct kafka producer
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:321)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:181)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getKafkaProducer(FlinkKafkaProducerBase.java:202)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.open(FlinkKafkaProducerBase.java:212)
> at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:375)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.kafka.common.KafkaException: Error registering mbean 
> kafka.producer:type=producer-metrics,client-id=producer-3
> at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:159)
> at 
> org.apache.kafka.common.metrics.JmxReporter.metricChange(JmxReporter.java:77)
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:288)
> at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:255)
> at org.apache.kafka.common.metrics.Metrics.addMetric(Metrics.java:239)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.registerMetrics(RecordAccumulator.java:137)
> at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.(RecordAccumulator.java:111)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:261)
> ... 9 more
> Caused by: javax.management.InstanceAlreadyExistsException: 
> kafka.producer:type=producer-metrics,client-id=producer-3
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at 
> org.apache.kafka.common.metrics.JmxReporter.reregister(JmxReporter.java:157)
> ... 16 more
> I doubt that task in different taskslot of one taskmanager use different 
> classloader, and taskid may be  the same in one process。 So this lead to 
> create kafkaProducer fail in one 

[GitHub] flink pull request #5052: [FLINK-8133][REST][docs] Generate REST API documen...

2017-12-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5052


---


[jira] [Commented] (FLINK-8133) Generate documentation for new REST API

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281796#comment-16281796
 ] 

ASF GitHub Bot commented on FLINK-8133:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5052


> Generate documentation for new REST API
> ---
>
> Key: FLINK-8133
> URL: https://issues.apache.org/jira/browse/FLINK-8133
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8133) Generate documentation for new REST API

2017-12-07 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8133.
---
Resolution: Fixed

master: 610fde722fc91ff760cff3865564a51a4b945f19

> Generate documentation for new REST API
> ---
>
> Key: FLINK-8133
> URL: https://issues.apache.org/jira/browse/FLINK-8133
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281780#comment-16281780
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
--- End diff --

nit: wrong import order (not sorted lexicographically)
```
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
```
items should appear before `LogicalSlot`


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507607
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
--- End diff --

nit: wrong import order (not sorted lexicographically)
```
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
```
items should appear before `LogicalSlot`


---


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507294
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the allocated slot 
is still pending
+   private final Map unresolvedRootSlots;
+
+   // Root nodes which have been completed (the underlying allocated slot 
has been assigned)
+   private final 

[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281778#comment-16281778
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155507294
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ---
@@ -0,0 +1,722 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotContext;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Manager which is responsible for slot sharing. Slot sharing allows to 
run different
+ * tasks in the same slot and to realize co-location constraints.
+ *
+ * The SlotSharingManager allows to create a hierarchy of {@link 
TaskSlot} such that
+ * every {@link TaskSlot} is uniquely identified by a {@link 
SlotRequestId} identifying
+ * the request for the TaskSlot and a {@link AbstractID} identifying the 
task or the
+ * co-location constraint running in this slot.
+ *
+ * The {@link TaskSlot} hierarchy is implemented by {@link 
MultiTaskSlot} and
+ * {@link SingleTaskSlot}. The former class represents inner nodes which 
can contain
+ * a number of other {@link TaskSlot} and the latter class represents the 
leave nodes.
+ * The hierarchy starts with a root {@link MultiTaskSlot} which is a future
+ * {@link SlotContext} assigned. The {@link SlotContext} represents the 
allocated slot
+ * on the TaskExecutor in which all slots of this hierarchy run. A {@link 
MultiTaskSlot}
+ * can be assigned multiple {@link SingleTaskSlot} or {@link 
MultiTaskSlot} if and only if
+ * the task slot does not yet contain another child with the same {@link 
AbstractID} identifying
+ * the actual task or the co-location constraint.
+ *
+ * Normal slot sharing is represented by a root {@link MultiTaskSlot} 
which contains a set
+ * of {@link SingleTaskSlot} on the second layer. Each {@link 
SingleTaskSlot} represents a different
+ * task.
+ *
+ * Co-location constraints are modeled by adding a {@link 
MultiTaskSlot} to the root node. The co-location
+ * constraint is uniquely identified by a {@link AbstractID} such that we 
cannot add a second co-located
+ * {@link MultiTaskSlot} to the same root node. Now all co-located tasks 
will be added to co-located
+ * multi task slot.
+ */
+public class SlotSharingManager {
+
+   private final SlotSharingGroupId slotSharingGroupId;
+
+   // needed to release allocated slots after a complete multi task slot 
hierarchy has been released
+   private final AllocatedSlotActions allocatedSlotActions;
+
+   // owner of the slots to which to return them when they are released 
from the outside
+   private final SlotOwner slotOwner;
+
+   private final Map allTaskSlots;
+
+   // Root nodes which have not been completed because the 

[jira] [Commented] (FLINK-7684) Avoid multiple data copies in MergingWindowSet

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281775#comment-16281775
 ] 

ASF GitHub Bot commented on FLINK-7684:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4723
  
I think this needs a lot more description of what is happening.
For example, this introduces a prominent new configuration setting on the 
`ExecutionConfig` that is nowhere described, making it hard to review this. The 
javadocs on the `OptimizationTarget` are minimal, and no other docs have been 
added.

As a general thought: I am very skeptical about adding such new setting, I 
would actually like us to go the opposite way and reduce the number of nobs 
further and further over time. Unless there are vast differences in the 
behavior, I find that an opinionated good implementation or choice of technique 
is better for users than offering a knob.


> Avoid multiple data copies in MergingWindowSet
> --
>
> Key: FLINK-7684
> URL: https://issues.apache.org/jira/browse/FLINK-7684
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.2.0, 1.3.0, 1.2.1, 1.3.1, 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>
> Currently MergingWindowSet uses ListState of tuples to persists it's mapping. 
> This is inefficient because this ListState of tuples must be converted to a 
> HashMap on each access.
> Furthermore, for some cases it might be inefficient to check whether mapping 
> has changed before saving it on state.
> Those two issues are causing multiple data copies and constructing multiple 
> Lists/Maps per each processed element, which is a reason for noticeable 
> performance issues.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4723: [FLINK-7684] Avoid data copies in MergingWindowSet

2017-12-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4723
  
I think this needs a lot more description of what is happening.
For example, this introduces a prominent new configuration setting on the 
`ExecutionConfig` that is nowhere described, making it hard to review this. The 
javadocs on the `OptimizationTarget` are minimal, and no other docs have been 
added.

As a general thought: I am very skeptical about adding such new setting, I 
would actually like us to go the opposite way and reduce the number of nobs 
further and further over time. Unless there are vast differences in the 
behavior, I find that an opinionated good implementation or choice of technique 
is better for users than offering a knob.


---


[jira] [Commented] (FLINK-7812) Log system resources as metrics

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281768#comment-16281768
 ] 

ASF GitHub Bot commented on FLINK-7812:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4801#discussion_r155503740
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 ---
@@ -102,37 +103,16 @@ public static void instantiateStatusMetrics(
private static void instantiateNetworkMetrics(
MetricGroup metrics,
final NetworkEnvironment network) {
-   metrics.gauge("TotalMemorySegments", new 
Gauge () {
-   @Override
-   public Long getValue() {
-   return (long) 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
-   }
-   });
 
-   metrics.gauge("AvailableMemorySegments", new 
Gauge () {
-   @Override
-   public Long getValue() {
-   return (long) 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
-   }
-   });
+   final NetworkBufferPool networkBufferPool = 
network.getNetworkBufferPool();
+   metrics.gauge("TotalMemorySegments", () -> 
(long) networkBufferPool.getTotalNumberOfMemorySegments());
--- End diff --

Replace with "Integer" Gauge and change to method reference?


> Log system resources as metrics
> ---
>
> Key: FLINK-7812
> URL: https://issues.apache.org/jira/browse/FLINK-7812
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7812) Log system resources as metrics

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281767#comment-16281767
 ] 

ASF GitHub Bot commented on FLINK-7812:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4801#discussion_r155503385
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/SystemResourcesCounter.java
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.utils;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import oshi.SystemInfo;
+import oshi.hardware.CentralProcessor;
+import oshi.hardware.CentralProcessor.TickType;
+import oshi.hardware.HardwareAbstractionLayer;
+import oshi.hardware.NetworkIF;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Daemon thread logging system resources.
+ *
+ * To accurately and consistently report CPU and network usage we have to 
periodically probe
+ * CPU ticks and network sent/received bytes and then convert those values 
to CPU usage and
+ * send/receive byte rates.
+ */
+@ThreadSafe
--- End diff --

A Thread is ThreadSafe? ;-)


> Log system resources as metrics
> ---
>
> Key: FLINK-7812
> URL: https://issues.apache.org/jira/browse/FLINK-7812
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7812) Log system resources as metrics

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281766#comment-16281766
 ] 

ASF GitHub Bot commented on FLINK-7812:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4801#discussion_r155501129
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerITCase.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static 
org.apache.flink.configuration.TaskManagerOptions.ADDITIONAL_LOGGING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization in the {@link 
TaskManagerRunner}.
+ */
+public class TaskManagerRunnerITCase {
--- End diff --

This test seems very specific to this logging, but is named as a generic 
TaskManagerRunner test. Give it a differnet name?

Separate question: Does it have to be an IT case that fully starts the TM, 
or can it be a unit test that checks config propagation?


> Log system resources as metrics
> ---
>
> Key: FLINK-7812
> URL: https://issues.apache.org/jira/browse/FLINK-7812
> Project: Flink
>  Issue Type: New Feature
>  Components: Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4801: [FLINK-7812] Log system resources metrics

2017-12-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4801#discussion_r155503740
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/util/MetricUtils.java
 ---
@@ -102,37 +103,16 @@ public static void instantiateStatusMetrics(
private static void instantiateNetworkMetrics(
MetricGroup metrics,
final NetworkEnvironment network) {
-   metrics.gauge("TotalMemorySegments", new 
Gauge () {
-   @Override
-   public Long getValue() {
-   return (long) 
network.getNetworkBufferPool().getTotalNumberOfMemorySegments();
-   }
-   });
 
-   metrics.gauge("AvailableMemorySegments", new 
Gauge () {
-   @Override
-   public Long getValue() {
-   return (long) 
network.getNetworkBufferPool().getNumberOfAvailableMemorySegments();
-   }
-   });
+   final NetworkBufferPool networkBufferPool = 
network.getNetworkBufferPool();
+   metrics.gauge("TotalMemorySegments", () -> 
(long) networkBufferPool.getTotalNumberOfMemorySegments());
--- End diff --

Replace with "Integer" Gauge and change to method reference?


---


[GitHub] flink pull request #4801: [FLINK-7812] Log system resources metrics

2017-12-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4801#discussion_r155503385
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/utils/SystemResourcesCounter.java
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.utils;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import oshi.SystemInfo;
+import oshi.hardware.CentralProcessor;
+import oshi.hardware.CentralProcessor.TickType;
+import oshi.hardware.HardwareAbstractionLayer;
+import oshi.hardware.NetworkIF;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Daemon thread logging system resources.
+ *
+ * To accurately and consistently report CPU and network usage we have to 
periodically probe
+ * CPU ticks and network sent/received bytes and then convert those values 
to CPU usage and
+ * send/receive byte rates.
+ */
+@ThreadSafe
--- End diff --

A Thread is ThreadSafe? ;-)


---


[GitHub] flink pull request #4801: [FLINK-7812] Log system resources metrics

2017-12-07 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4801#discussion_r155501129
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerITCase.java
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.configuration.MetricOptions.REPORTERS_LIST;
+import static 
org.apache.flink.configuration.TaskManagerOptions.ADDITIONAL_LOGGING;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration tests for proper initialization in the {@link 
TaskManagerRunner}.
+ */
+public class TaskManagerRunnerITCase {
--- End diff --

This test seems very specific to this logging, but is named as a generic 
TaskManagerRunner test. Give it a differnet name?

Separate question: Does it have to be an IT case that fully starts the TM, 
or can it be a unit test that checks config propagation?


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281765#comment-16281765
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
+
+   }
+
+   scheduler.instanceDied(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i3);
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+   assertFalse(i1.isAlive());
+   assertFalse(i2.isAlive());
+   

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503866
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
--- End diff --

😃 


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281764#comment-16281764
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503866
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
--- End diff --

 


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the 

[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155503994
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTest.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static 
org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils.getRandomInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SchedulerTest extends TestLogger {
+
+   @Test
+   public void testAddAndRemoveInstance() {
+   try {
+   Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
+
+   Instance i1 = getRandomInstance(2);
+   Instance i2 = getRandomInstance(2);
+   Instance i3 = getRandomInstance(2);
+
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+   scheduler.newInstanceAvailable(i3);
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+
+   // cannot add available instance again
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted instance twice");
+   }
+   catch (IllegalArgumentException e) {
+   // bueno!
+   }
+
+   // some instances die
+   assertEquals(3, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(6, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i2);
+   assertEquals(2, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(4, scheduler.getNumberOfAvailableSlots());
+
+   // try to add a dead instance
+   try {
+   scheduler.newInstanceAvailable(i2);
+   fail("Scheduler accepted dead instance");
+   }
+   catch (IllegalArgumentException e) {
+   // stimmt
+
+   }
+
+   scheduler.instanceDied(i1);
+   assertEquals(1, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(2, scheduler.getNumberOfAvailableSlots());
+   scheduler.instanceDied(i3);
+   assertEquals(0, 
scheduler.getNumberOfAvailableInstances());
+   assertEquals(0, scheduler.getNumberOfAvailableSlots());
+
+   assertFalse(i1.isAlive());
+   assertFalse(i2.isAlive());
+   assertFalse(i3.isAlive());
+   }
+   catch (Exception e) {
--- End diff --

Better propagate the exception but I guess this file was copy pasted.


---


[jira] [Commented] (FLINK-7956) Add support for scheduling with slot sharing

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281759#comment-16281759
 ] 

ASF GitHub Bot commented on FLINK-7956:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155502971
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Simple {@link AllocatedSlotActions} implementations for testing 
purposes.
+ */
+public class TestingAllocatedSlotActions implements AllocatedSlotActions {
+
+   private volatile Consumer> releaseSlotConsumer;
+
+   public void setReleaseSlotConsumer(Consumer> releaseSlotConsumer) {
+   this.releaseSlotConsumer = 
Preconditions.checkNotNull(releaseSlotConsumer);
+   }
+
+   @Override
+   public CompletableFuture releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable 
Throwable cause) {
+   Consumer> 
currentReleaseSlotConsumer = this.releaseSlotConsumer;
+
+   if (currentReleaseSlotConsumer != null) {
+   
currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, 
cause));
--- End diff --

nit: whitespace after `cause`
```
... cause   ));
```


> Add support for scheduling with slot sharing
> 
>
> Key: FLINK-7956
> URL: https://issues.apache.org/jira/browse/FLINK-7956
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scheduler
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> In order to reach feature equivalence with the old code base, we should add 
> support for scheduling with slot sharing to the {{SlotPool}}. This will also 
> allow us to run all the IT cases based on the {{AbstractTestBase}} on the 
> Flip-6 {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5091: [FLINK-7956] [flip6] Add support for queued schedu...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5091#discussion_r155502971
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/TestingAllocatedSlotActions.java
 ---
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Simple {@link AllocatedSlotActions} implementations for testing 
purposes.
+ */
+public class TestingAllocatedSlotActions implements AllocatedSlotActions {
+
+   private volatile Consumer> releaseSlotConsumer;
+
+   public void setReleaseSlotConsumer(Consumer> releaseSlotConsumer) {
+   this.releaseSlotConsumer = 
Preconditions.checkNotNull(releaseSlotConsumer);
+   }
+
+   @Override
+   public CompletableFuture releaseSlot(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, @Nullable 
Throwable cause) {
+   Consumer> 
currentReleaseSlotConsumer = this.releaseSlotConsumer;
+
+   if (currentReleaseSlotConsumer != null) {
+   
currentReleaseSlotConsumer.accept(Tuple3.of(slotRequestId, slotSharingGroupId, 
cause));
--- End diff --

nit: whitespace after `cause`
```
... cause   ));
```


---


[jira] [Commented] (FLINK-8133) Generate documentation for new REST API

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281754#comment-16281754
 ] 

ASF GitHub Bot commented on FLINK-8133:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5052
  
merging.


> Generate documentation for new REST API
> ---
>
> Key: FLINK-8133
> URL: https://issues.apache.org/jira/browse/FLINK-8133
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5052: [FLINK-8133][REST][docs] Generate REST API documentation

2017-12-07 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5052
  
merging.


---


[jira] [Commented] (FLINK-8174) Mesos RM unable to accept offers for unreserved resources

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281752#comment-16281752
 ] 

ASF GitHub Bot commented on FLINK-8174:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5114
  
It depends a little bit on 1.4. We have serious build stability issues in 
the `release-1.3` branch and it would take a serious effort to stabilize it 
again. If I could choose, then we wouldn't have to release 1.3.3 because users 
switch directly to 1.4.


> Mesos RM unable to accept offers for unreserved resources
> -
>
> Key: FLINK-8174
> URL: https://issues.apache.org/jira/browse/FLINK-8174
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos
>Affects Versions: 1.4.0, 1.3.3
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Blocker
> Fix For: 1.4.0, 1.5.0
>
>
> Flink has suffered a regression due to FLINK-7294.   Any attempt to accept a 
> resource offer that is based on unreserved resources will fail, because Flink 
> (as of FLINK-7294) erroneously insists that the resource come from a prior 
> reservation.
> Looking at the original issue, the problem may have been misdiagnosed.  
> Ideally Flink should work with both reserved and unreserved resources, but 
> the latter is a more common situation that is now broken.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8133) Generate documentation for new REST API

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281753#comment-16281753
 ] 

ASF GitHub Bot commented on FLINK-8133:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5052#discussion_r155501188
  
--- Diff: 
flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.docs.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.ConfigurationException;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Generator for the Rest API documentation.
+ *
+ * This class can be either invoked directly
--- End diff --

leftover comment, will remove it.


> Generate documentation for new REST API
> ---
>
> Key: FLINK-8133
> URL: https://issues.apache.org/jira/browse/FLINK-8133
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5114: [FLINK-8174] [mesos] Mesos RM unable to accept offers for...

2017-12-07 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5114
  
It depends a little bit on 1.4. We have serious build stability issues in 
the `release-1.3` branch and it would take a serious effort to stabilize it 
again. If I could choose, then we wouldn't have to release 1.3.3 because users 
switch directly to 1.4.


---


[GitHub] flink pull request #5052: [FLINK-8133][REST][docs] Generate REST API documen...

2017-12-07 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5052#discussion_r155501188
  
--- Diff: 
flink-docs/src/main/java/org/apache/flink/docs/rest/RestAPIDocGenerator.java ---
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.docs.rest;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.RestServerEndpoint;
+import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.ConfigurationException;
+
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
+import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+/**
+ * Generator for the Rest API documentation.
+ *
+ * This class can be either invoked directly
--- End diff --

leftover comment, will remove it.


---


[jira] [Commented] (FLINK-7561) Add support for pre-aggregation in DataStream API

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281748#comment-16281748
 ] 

ASF GitHub Bot commented on FLINK-7561:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4626
  
I think it would be nice to have a utility here in order to make this 
easier to use:
```java
DataStream result =
Utils.withPreaggregation(
stream.timeWindow(Time.minutes(5)), 
myAggregateFunction
)
.apply(windowFunction);
```

The utility would basically take the aggregate function and insert the 
stream transformation for the pre-aggregation on the "*predecessor* or the 
keyed stream, and then set up the `WindowedStream` again.

Pseudo code:
```java
public static  WindowedStream 
preaggregate(
WindowedStream windowedStream,
AggregateFunction preAggregator) {

   // sanity check that the windowedStream has no custom trigger and evictor

   PreAggregationOperator preAggOp = new 
PreAggregationOperator(preAggregator, properties from windowed stream);

DataStream originalStream = 'get predecessor before keyBy from 
windowed stream'
DataStream preAggregated = originalStream.transform(preAggOp , ...);

WindowedStream windowedAgain = preAggregated
.keyBy(key extractor from original windowed stream)
.window(assigner);

return windowedAgain;
}
```


> Add support for pre-aggregation in DataStream API
> -
>
> Key: FLINK-7561
> URL: https://issues.apache.org/jira/browse/FLINK-7561
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4626: [FLINK-7561][streaming] Implement PreAggregationOperator

2017-12-07 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/4626
  
I think it would be nice to have a utility here in order to make this 
easier to use:
```java
DataStream result =
Utils.withPreaggregation(
stream.timeWindow(Time.minutes(5)), 
myAggregateFunction
)
.apply(windowFunction);
```

The utility would basically take the aggregate function and insert the 
stream transformation for the pre-aggregation on the "*predecessor* or the 
keyed stream, and then set up the `WindowedStream` again.

Pseudo code:
```java
public static  WindowedStream 
preaggregate(
WindowedStream windowedStream,
AggregateFunction preAggregator) {

   // sanity check that the windowedStream has no custom trigger and evictor

   PreAggregationOperator preAggOp = new 
PreAggregationOperator(preAggregator, properties from windowed stream);

DataStream originalStream = 'get predecessor before keyBy from 
windowed stream'
DataStream preAggregated = originalStream.transform(preAggOp , ...);

WindowedStream windowedAgain = preAggregated
.keyBy(key extractor from original windowed stream)
.window(assigner);

return windowedAgain;
}
```


---


[jira] [Commented] (FLINK-8089) Fulfil slot requests with unused offered slots

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281732#comment-16281732
 ] 

ASF GitHub Bot commented on FLINK-8089:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r155496482
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
+* requests.
+*
+* See FLINK-8089
+*/
+   @Test
+   public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
+   final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+   final JobMasterId jobMasterId = JobMasterId.generate();
+   final String jobMasterAddress = "foobar";
+   final CompletableFuture allocationIdFuture = new 
CompletableFuture<>();
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+
+   resourceManagerGateway.setRequestSlotConsumer(
+   (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+   final SlotRequestID slotRequestId1 = new SlotRequestID();
+   final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+   try {
+   slotPool.start(jobMasterId, jobMasterAddress);
+
+   final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+
+   final ScheduledUnit scheduledUnit = new 
ScheduledUnit(mock(Execution.class));
+
+   
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+   CompletableFuture slotFuture1 = 
slotPoolGateway.allocateSlot(
+   slotRequestId1,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   // wait for the first slot request
+   final AllocationID allocationId = 
allocationIdFuture.get();
+
+   CompletableFuture slotFuture2 = 
slotPoolGateway.allocateSlot(
+   slotRequestId2,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+   try {
+   // this should fail with a CancellationException
+   slotFuture1.get();
+   fail("The first slot future should have failed 
because it was cancelled.");
+   } catch (ExecutionException ee) {
+   
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
CancellationException);
+   }
+
+   final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+
+   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+   
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
+
+   // the slot offer should fulfil the second slot request
--- End diff --

nit: same here


> Fulfil slot requests with unused offered slots
> --
>
> Key: FLINK-8089
> URL: https://issues.apache.org/jira/browse/FLINK-8089
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>  Labels: flip-6
>
> The {{SlotPool}} adds unused offered slots to the list of available slots 
> without checking whether another pending slot request could be fulfilled with 
> this slot. This should be changed.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5090: [FLINK-8089] Also check for other pending slot req...

2017-12-07 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5090#discussion_r155496482
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java 
---
@@ -383,6 +386,76 @@ public void 
testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}
 
+   /**
+* Tests that unused offered slots are directly used to fulfil pending 
slot
+* requests.
+*
+* See FLINK-8089
+*/
+   @Test
+   public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws 
Exception {
+   final SlotPool slotPool = new SlotPool(rpcService, jobId);
+
+   final JobMasterId jobMasterId = JobMasterId.generate();
+   final String jobMasterAddress = "foobar";
+   final CompletableFuture allocationIdFuture = new 
CompletableFuture<>();
+   final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+
+   resourceManagerGateway.setRequestSlotConsumer(
+   (SlotRequest slotRequest) -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+   final SlotRequestID slotRequestId1 = new SlotRequestID();
+   final SlotRequestID slotRequestId2 = new SlotRequestID();
+
+   try {
+   slotPool.start(jobMasterId, jobMasterAddress);
+
+   final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+
+   final ScheduledUnit scheduledUnit = new 
ScheduledUnit(mock(Execution.class));
+
+   
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+   CompletableFuture slotFuture1 = 
slotPoolGateway.allocateSlot(
+   slotRequestId1,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   // wait for the first slot request
+   final AllocationID allocationId = 
allocationIdFuture.get();
+
+   CompletableFuture slotFuture2 = 
slotPoolGateway.allocateSlot(
+   slotRequestId2,
+   scheduledUnit,
+   ResourceProfile.UNKNOWN,
+   Collections.emptyList(),
+   timeout);
+
+   slotPoolGateway.cancelSlotRequest(slotRequestId1);
+
+   try {
+   // this should fail with a CancellationException
+   slotFuture1.get();
+   fail("The first slot future should have failed 
because it was cancelled.");
+   } catch (ExecutionException ee) {
+   
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof 
CancellationException);
+   }
+
+   final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+
+   
slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();
+
+   
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
+
+   // the slot offer should fulfil the second slot request
--- End diff --

nit: same here


---


[jira] [Commented] (FLINK-8203) Make schema definition of DataStream/DataSet to Table conversion more flexible

2017-12-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16281731#comment-16281731
 ] 

ASF GitHub Bot commented on FLINK-8203:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/5132

[FLINK-8203] [FLINK-7681] [table] Make schema definition of 
DataStream/DataSet to Table conversion more flexible

## What is the purpose of the change

This PR makes the schema definition more flexible. It add two ways of 
adding schema information:

Reference input fields by name:
All fields in the schema definition are referenced by name
(and possibly renamed using an alias (as). In this mode, fields can be 
reordered and
projected out. Moreover, we can define proctime and rowtime attributes at 
arbitrary
positions using arbitrary names (except those that exist in the result 
schema). This mode
can be used for any input type, including POJOs.

Reference input fields by position:
Field references must refer to existing fields in the input type (except for
renaming with alias (as)). In this mode, fields are simply renamed. 
Event-time attributes can
replace the field on their position in the input data (if it is of correct 
type) or be
appended at the end. Proctime attributes must be appended at the end. This 
mode can only be
used if the input type has a defined field order (tuple, case class, Row) 
and no of fields
references a field of the input type.

It also allows any TypeInformation. In the past, this behavior was not 
consistent.

I will add some paragraphs to the documentation, once we agreed on this new 
behavior.

## Brief change log

Various changes in `TableEnvironment`, `Stream/BatchTableEnvironment`, and 
pattern matches that referenced `AtomicType` instead of `TypeInformation`.


## Verifying this change

See TableEnvironment tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? will document it later


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8203

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5132.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5132


commit 38562e1dcc5416996ad5531b901f89e4b868e5eb
Author: twalthr 
Date:   2017-12-07T10:52:28Z

[FLINK-8203] [FLINK-7681] [table] Make schema definition of 
DataStream/DataSet to Table conversion more flexible




> Make schema definition of DataStream/DataSet to Table conversion more flexible
> --
>
> Key: FLINK-8203
> URL: https://issues.apache.org/jira/browse/FLINK-8203
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>
> When converting or registering a {{DataStream}} or {{DataSet}} as {{Table}}, 
> the schema of the table can be defined (by default it is extracted from the 
> {{TypeInformation}}.
> The schema needs to be manually specified to select (project) fields, rename 
> fields, or define time attributes. Right now, there are several limitations 
> how the fields can be defined that also depend on the type of the 
> {{DataStream}} / {{DataSet}}. Types with explicit field ordering (e.g., 
> tuples, case classes, Row) require schema definition based on the position of 
> fields. Pojo types which have no fixed order of fields, require to refer to 
> fields by name. Moreover, there are several restrictions on how time 
> attributes can be defined, e.g., event time attribute must replace an 
> existing field or be appended and proctime attributes must be appended.
> I think we can make the schema definition more flexible and provide two modes:
> 1. Reference input fields by name: All fields in the schema definition are 
> referenced by name (and possibly renamed using an alias ({{as}}). In this 
> mode, fields can be reordered and projected out. Moreover, we can 

  1   2   >