[jira] [Updated] (FLINK-12119) Add OWASP Dependency Check to Flink Build

2019-04-08 Thread Konstantin Knauf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-12119:
-
Description: 
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-docs
 * flink-fs-tests
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]

  was:
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-docs
 * flink-end-to-end-tests
 * flink-fs-tests
 * flink-test-utils-parent
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]


> Add OWASP Dependency Check to Flink Build
> -
>
> Key: FLINK-12119
> URL: https://issues.apache.org/jira/browse/FLINK-12119
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>
> In order to obtain some visibility on the current known security 
> vulnerabilities in Flink's dependencies. It would be useful to include the 
> OWASP dependency check plugin [1] into our Maven build.
> By including it into flink-parent, we can get summary of all dependencies of 
> all child projects by running
> {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}
> We should probably exclude some modules from the dependency-check. These 
> could be:
>  * flink-docs
>  * flink-fs-tests
>  * flink-yarn-tests
>  * flink-contrib
> Anything else? What about flink-python/flink-streaming-python?**
> In addition I propose to exclude all dependencies in the *system* or 
> *provided* scope.
> At least initially, the build would never fails because of vulnerabilities.
>  [1] 
> [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12119) Add OWASP Dependency Check to Flink Build

2019-04-08 Thread Konstantin Knauf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-12119:
-
Description: 
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-docs
 * flink-end-to-end-tests
 * flink-fs-tests
 * flink-test-utils-parent
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]

  was:
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-docs
 * flink-examples
 * flink-end-to-end-tests
 * flink-fs-tests
 * flink-test-utils-parent
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]


> Add OWASP Dependency Check to Flink Build
> -
>
> Key: FLINK-12119
> URL: https://issues.apache.org/jira/browse/FLINK-12119
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>
> In order to obtain some visibility on the current known security 
> vulnerabilities in Flink's dependencies. It would be useful to include the 
> OWASP dependency check plugin [1] into our Maven build.
> By including it into flink-parent, we can get summary of all dependencies of 
> all child projects by running
> {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}
> We should probably exclude some modules from the dependency-check. These 
> could be:
>  * flink-docs
>  * flink-end-to-end-tests
>  * flink-fs-tests
>  * flink-test-utils-parent
>  * flink-yarn-tests
>  * flink-contrib
> Anything else? What about flink-python/flink-streaming-python?**
> In addition I propose to exclude all dependencies in the *system* or 
> *provided* scope.
> At least initially, the build would never fails because of vulnerabilities.
>  [1] 
> [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] Refactor the creation of ResultPartition and InputGate into NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #7822: [FLINK-11726][network] 
Refactor the creation of ResultPartition and InputGate into NetworkEnvironment
URL: https://github.com/apache/flink/pull/7822#discussion_r272989781
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ##
 @@ -88,6 +104,12 @@
 
private final boolean enableCreditBased;
 
+   private final boolean enableNetworkDetailedMetrics;
+
+   private final ConcurrentHashMap 
allPartitions;
 
 Review comment:
   Sorry for missing this comment before.
   
   Yes, I agree with the point of indexing by `executionId`, and use task name 
+ id for debugging.
   My current thought is passing `ShuffleTaskInfo` into `ShuffleService` during 
creating partition/gate, so #7835 would not be used any more. I would double 
check the details from the implementation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12119) Add OWASP Dependency Check to Flink Build

2019-04-08 Thread Konstantin Knauf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-12119:
-
Description: 
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-docs
 * flink-examples
 * flink-end-to-end-tests
 * flink-fs-tests
 * flink-test-utils-parent
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]

  was:
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-dist
 * flink-docs
 * flink-examples
 * flink-tests
 * flink-end-to-end-tests
 * flink-fs-tests
 * flink-test-utils-parent
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]


> Add OWASP Dependency Check to Flink Build
> -
>
> Key: FLINK-12119
> URL: https://issues.apache.org/jira/browse/FLINK-12119
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>
> In order to obtain some visibility on the current known security 
> vulnerabilities in Flink's dependencies. It would be useful to include the 
> OWASP dependency check plugin [1] into our Maven build.
> By including it into flink-parent, we can get summary of all dependencies of 
> all child projects by running
> {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}
> We should probably exclude some modules from the dependency-check. These 
> could be:
>  * flink-docs
>  * flink-examples
>  * flink-end-to-end-tests
>  * flink-fs-tests
>  * flink-test-utils-parent
>  * flink-yarn-tests
>  * flink-contrib
> Anything else? What about flink-python/flink-streaming-python?**
> In addition I propose to exclude all dependencies in the *system* or 
> *provided* scope.
> At least initially, the build would never fails because of vulnerabilities.
>  [1] 
> [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12119) Add OWASP Dependency Check to Flink Build

2019-04-08 Thread Konstantin Knauf (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantin Knauf updated FLINK-12119:
-
Description: 
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-dist
 * flink-docs
 * flink-examples
 * flink-tests
 * flink-end-to-end-tests
 * flink-fs-tests
 * flink-test-utils-parent
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]

  was:
In order to obtain some visibility on the current known security 
vulnerabilities in Flink's dependencies. It would be useful to include the 
OWASP dependency check plugin [1] into our Maven build.

By including it into flink-parent, we can get summary of all dependencies of 
all child projects by running

{{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}

We should probably exclude some modules from the dependency-check. These could 
be:
 * flink-dist
 * flink-docs
 * flink-examples
 * flink-tests
 * flink-shaded-yarn-tests
 * flink-end-to-end-tests
 * flink-fs-tests
 * flink-test-utils-parent
 * flink-yarn-tests
 * flink-contrib

Anything else? What about flink-python/flink-streaming-python?**

In addition I propose to exclude all dependencies in the *system* or *provided* 
scope.

At least initially, the build would never fails because of vulnerabilities.

 [1] 
[https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]


> Add OWASP Dependency Check to Flink Build
> -
>
> Key: FLINK-12119
> URL: https://issues.apache.org/jira/browse/FLINK-12119
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Konstantin Knauf
>Assignee: Konstantin Knauf
>Priority: Major
>
> In order to obtain some visibility on the current known security 
> vulnerabilities in Flink's dependencies. It would be useful to include the 
> OWASP dependency check plugin [1] into our Maven build.
> By including it into flink-parent, we can get summary of all dependencies of 
> all child projects by running
> {{mvn clean org.owasp:dependency-check-maven:5.0.0-M2:aggregate}}
> We should probably exclude some modules from the dependency-check. These 
> could be:
>  * flink-dist
>  * flink-docs
>  * flink-examples
>  * flink-tests
>  * flink-end-to-end-tests
>  * flink-fs-tests
>  * flink-test-utils-parent
>  * flink-yarn-tests
>  * flink-contrib
> Anything else? What about flink-python/flink-streaming-python?**
> In addition I propose to exclude all dependencies in the *system* or 
> *provided* scope.
> At least initially, the build would never fails because of vulnerabilities.
>  [1] 
> [https://jeremylong.github.io/DependencyCheck/dependency-check-maven/index.html]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] flinkbot commented on issue #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
flinkbot commented on issue #8122: [FLINK-12121] [State Backends] Use 
composition instead of inheritance for the InternalKeyContext logic in backend
URL: https://github.com/apache/flink/pull/8122#issuecomment-480782903
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812308#comment-16812308
 ] 

Tony Xintong Song edited comment on FLINK-12122 at 4/8/19 10:50 AM:


Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode.


was (Author: xintongsong):
Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode. [link title|http://example.com]

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12121) Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-12121:
---
Labels: pull-request-available  (was: )

> Use composition instead of inheritance for the InternalKeyContext logic in 
> backend
> --
>
> Key: FLINK-12121
> URL: https://issues.apache.org/jira/browse/FLINK-12121
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yu Li
>Assignee: Yu Li
>Priority: Major
>  Labels: pull-request-available
>
> Commonly it's 
> [recommended|https://stackoverflow.com/questions/2399544/difference-between-inheritance-and-composition]
>  to favor composition over inheritance in java design, but currently in keyed 
> backend we're using inheritance for the {{InternalKeyContext}} logic, and 
> here we propose to change to the composition way.
> Another advantage of changing to the composition way is that we could remove 
> the requirement of a heap backend instance when constructing 
> {{HeapRestoreOperation}}, and further making sure all fields are final when 
> constructing the {{HeapKeyedStateBackend}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] carp84 opened a new pull request #8122: [FLINK-12121] [State Backends] Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread GitBox
carp84 opened a new pull request #8122: [FLINK-12121] [State Backends] Use 
composition instead of inheritance for the InternalKeyContext logic in backend
URL: https://github.com/apache/flink/pull/8122
 
 
   ## What is the purpose of the change
   
   This PR changes keyed backend to use composition instead of inheritance for 
the `InternalKeyContext` logic.
   
   
   ## Brief change log
   
   Introduced a new `InternalKeyContextImpl` and use it as a field in keyed 
backend, instead of making `KeyedStateBackend` implementing 
`InternalKeyContext` interface.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as all tests under 
`org.apache.flink.runtime.state` package.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812308#comment-16812308
 ] 

Tony Xintong Song edited comment on FLINK-12122 at 4/8/19 10:46 AM:


Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode. [link title|http://example.com]


was (Author: xintongsong):
Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|[https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java]],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode.

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-10437) Some of keys under withDeprecatedKeys aren't marked as @depreacted

2019-04-08 Thread Ji Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ji Liu reassigned FLINK-10437:
--

Assignee: Ji Liu

> Some of keys under withDeprecatedKeys aren't marked as @depreacted
> --
>
> Key: FLINK-10437
> URL: https://issues.apache.org/jira/browse/FLINK-10437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Assignee: Ji Liu
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> as title. For example {{RestOptions#BIND_ADDRESS}} is 
> {{withDeprecatedKeys(WebOptions.ADDRESS.key())}}, but {{WebOptions.ADDRESS}} 
> isn't marked as deprecated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7916) Remove NetworkStackThroughputITCase

2019-04-08 Thread Ji Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812155#comment-16812155
 ] 

Ji Liu edited comment on FLINK-7916 at 4/8/19 10:40 AM:


Agree, I think move these tests to _org.apache.flink.test.manual_ under 
_flink-tests_ is a good idea. Is this still open?[~till.rohrmann]


was (Author: tianchen92):
Agree, I think move these tests to _org.apache.flink.test.manual_ under 
_flink-tests_ is a good idea.

> Remove NetworkStackThroughputITCase
> ---
>
> Key: FLINK-7916
> URL: https://issues.apache.org/jira/browse/FLINK-7916
> Project: Flink
>  Issue Type: Task
>  Components: Runtime / Network, Tests
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Priority: Major
>
> Flink's code base contains the {{NetworkStackThroughputITCase}} which is not 
> really a test. Moreover it is marked as {{Ignored}}. I propose to remove this 
> test because it is more of a benchmark. We could think about creating a 
> benchmark project where we move these kind of "tests".
> In general I think we should remove ignored tests if they won't be fixed 
> immediately. The danger is far too high that we forget about them and then we 
> only keep the maintenance burden of it. This is especially true for the above 
> mentioned test case.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272976560
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Utility functions for the plugin mechanism.
+ */
+public final class PluginUtils {
 
 Review comment:
   I would merge this and `PluginManagerSingleton` together, or even merge them 
with `PluginManager`. It's unclear for me what is the benefit of having those 
two separated and the drawback is that it increases confusion and chances that 
someone will not find the other when looking for a way how to initialize 
plugins.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272975453
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Utility functions for the plugin mechanism.
+ */
+public final class PluginUtils {
+
+   private static final Logger log = 
LoggerFactory.getLogger(PluginUtils.class);
+
+   private PluginUtils() {
+   throw new AssertionError("Singleton class.");
+   }
+
+   public static void initPluginManagerSingletonFromRootFolder(@Nullable 
Path pluginsRootPath) {
 
 Review comment:
   Please either overload this method or use `Optional`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272975830
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginUtils.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Utility functions for the plugin mechanism.
+ */
+public final class PluginUtils {
+
+   private static final Logger log = 
LoggerFactory.getLogger(PluginUtils.class);
+
+   private PluginUtils() {
+   throw new AssertionError("Singleton class.");
+   }
+
+   public static void initPluginManagerSingletonFromRootFolder(@Nullable 
Path pluginsRootPath) {
+   Collection pluginDescriptorsForDirectory = 
Collections.emptyList();
+   if (pluginsRootPath != null) {
+   try {
+   pluginDescriptorsForDirectory =
+   new 
DirectoryBasedPluginDescriptorsFactory(pluginsRootPath).createPluginDescriptors();
+   } catch (IOException ioex) {
+   log.warn("Exception when locating plugins in 
root folder {}. Ignoring plugins.", ioex);
 
 Review comment:
   same question, should we ignore the exceptions? If there is some missing 
plugin a job will fail later with a less readable error message. I would prefer 
to throw exceptions, but just make sure that code will never throw if `plugins` 
directory is empty.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Tony Xintong Song (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812308#comment-16812308
 ] 

Tony Xintong Song commented on FLINK-12122:
---

Hi [~till.rohrmann], I agree with you that we should spread tasks evenly across 
TaskManagers.

FYI, similar behavior change is already included in the to-be-merged [blink 
branch|[https://github.com/apache/flink/blob/blink/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DynamicAssigningSlotManager.java]],
 where we support three policies (random, spreading based on available slots, 
and spreading based on available resource) for distribute slots to TaskManagers 
for session mode.

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10437) Some of keys under withDeprecatedKeys aren't marked as @depreacted

2019-04-08 Thread Ji Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812271#comment-16812271
 ] 

Ji Liu edited comment on FLINK-10437 at 4/8/19 10:12 AM:
-

I have checked usages of _ConfigOption.withDeparecatedKeys_ and find only the 
case mentioned above meet the requirements. Could you please help to review 
this PR? thanks very much! [~rmetzger] 
[[https://github.com/apache/flink/pull/8121]]


was (Author: tianchen92):
[~Tison] I have checked usages of _ConfigOption.withDeparecatedKeys_ and find 
only the case mentioned above meet the requirements. Could you please help to 
review this PR? thanks very much! 
[PR|[https://github.com/apache/flink/pull/8121]]

> Some of keys under withDeprecatedKeys aren't marked as @depreacted
> --
>
> Key: FLINK-10437
> URL: https://issues.apache.org/jira/browse/FLINK-10437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> as title. For example {{RestOptions#BIND_ADDRESS}} is 
> {{withDeprecatedKeys(WebOptions.ADDRESS.key())}}, but {{WebOptions.ADDRESS}} 
> isn't marked as deprecated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8902) Re-scaling job sporadically fails with KeeperException

2019-04-08 Thread Andrey Zagrebin (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrey Zagrebin reassigned FLINK-8902:
--

Assignee: Andrey Zagrebin

> Re-scaling job sporadically fails with KeeperException
> --
>
> Key: FLINK-8902
> URL: https://issues.apache.org/jira/browse/FLINK-8902
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.5.0, 1.6.0
> Environment: Commit: 80020cb
> Hadoop: 2.8.3
> YARN
>  
>Reporter: Gary Yao
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: flip6
> Fix For: 1.7.3, 1.6.5
>
>
> *Description*
>  Re-scaling a job with {{bin/flink modify -p }} sporadically 
> fails with a {{KeeperException}}
> *Steps to reproduce*
>  # Submit job to Flink cluster with flip6 enabled running on YARN (session 
> mode).
>  # Re-scale job (5-20 times)
> *Stacktrace (client)*
> {noformat}
> org.apache.flink.util.FlinkException: Could not rescale job 
> 61e2e99db2e959ebd94e40f9c5e816bc.
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$modify$8(CliFrontend.java:766)
>   at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:954)
>   at org.apache.flink.client.cli.CliFrontend.modify(CliFrontend.java:757)
>   at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1037)
>   at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1095)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:422)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>   at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>   at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1095)
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint 
> hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got 
> corrupted. Deleting this savepoint as a precaution.
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$rescaleOperators$3(JobMaster.java:525)
>   at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>   at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:295)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:150)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: 
> org.apache.flink.runtime.jobmaster.exceptions.JobModificationException: Could 
> not restore from temporary rescaling savepoint. This might indicate that the 
> savepoint 
> hdfs://172.31.33.72:9000/flinkha/savepoints/savepoint-61e2e9-fdb3d05a0035 got 
> corrupted. Deleting this savepoint as a precaution.
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$restoreExecutionGraphFromRescalingSavepoint$17(JobMaster.java:1317)
>   at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>   at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.ut

[GitHub] [flink] zhijiangW commented on issue #7835: [FLINK-11750][network] Replace IntermediateResultPartitionID with ResultPartitionID in ResultPartitionDeploymentDescriptor

2019-04-08 Thread GitBox
zhijiangW commented on issue #7835: [FLINK-11750][network] Replace 
IntermediateResultPartitionID with ResultPartitionID in 
ResultPartitionDeploymentDescriptor
URL: https://github.com/apache/flink/pull/7835#issuecomment-480765630
 
 
   Thanks for reviews and above suggestions @azagrebin .
   
   Your concern is reasonable, and I think this issue could further cover two 
aspects:
   
   1. How to generate the `ResultPartitionID`? 
   `ResultPartitionID ` is just a unique identifier for partition, even I think 
the `ResultPartitionID` might not cover `ExecutionAttepmtID` future. 
   
   2. Who to generate the `ResultPartitionID`?
   It could be created by `ShuffleMaster` or `ShuffleService`. In current 
implementation, `ResultPartitionID` is generated by `ShuffleMaster` in 
`InputChannelDeploymentDescriptor` on consumer side, but it is generated by 
`ShuffleService` on producer side. so it is not consistent ATM.
   
   The motivation for this PR is simplifying the interface method of 
`ShuffleService#createResultPartitionWriter` to not pass explicit parameter of 
`ExecutionAttemptID` which could be got from 
`ResultPartitionDeploymentDescriptor` instead. Then the creation of 
`ResultPartiitonID` is consistent on both sides.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r272965435
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/expressions/PlannerExpressionParserImpl.scala
 ##
 @@ -232,7 +232,7 @@ object PlannerExpressionParserImpl extends JavaTokenParsers
   lazy val literalExpr: PackratParser[Expression] =
 numberLiteral | doubleQuoteStringLiteral | singleQuoteStringLiteral | 
boolLiteral
 
-  lazy val fieldReference: PackratParser[UnresolvedFieldReferenceExpression] = 
(STAR | ident) ^^ {
+  lazy val fieldReference: PackratParser[UnresolvedReferenceExpression] = 
(STAR | ident) ^^ {
 sym => unresolvedFieldRef(sym)
 
 Review comment:
   Please also update the API util class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-12106) Jobmanager is killing FINISHED taskmanger containers, causing exception in still running Taskmanagers an

2019-04-08 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-12106.
-
Resolution: Duplicate

Thanks for reporting this issue [~Hutchins]. [~maguowei] is right and this 
issue should be fixed with FLINK-10941. Hence, closing this issue as a 
duplicate.

> Jobmanager is killing FINISHED taskmanger containers, causing exception in 
> still running Taskmanagers an
> 
>
> Key: FLINK-12106
> URL: https://issues.apache.org/jira/browse/FLINK-12106
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN
>Affects Versions: 1.7.2
> Environment: Hadoop:  hdp/2.5.6.0-40
> Flink: 2.7.2
>Reporter: John
>Priority: Major
>
> When running a single flink job on YARN, some of the taskmanger containers 
> reach the FINISHED state before others.  It appears that, after receiving 
> final execution state FINISHED from a taskmanager, jobmanager is waiting ~68 
> seconds and then freeing the associated slot in the taskmanager.  After and 
> additional 60 seconds, jobmanager is stopping the same taskmanger because 
> TaskExecutor exceeded the idle timeout.
> Meanwhile, other taskmangers are still working to complete the job.  Within 
> 10 seconds after the taskmanger container above is stopped, the remaining 
> task managers receive an exception due to loss of connection to the stopped 
> taskmanager.  These exceptions result job failure.
>  
> Relevant logs:
> 2019-04-03 13:49:00,013 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_38 
> (akka.tcp://flink@hadoop4:42745/user/taskmanager_0) at ResourceManager
> 2019-04-03 13:49:05,900 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_59 
> (akka.tcp://flink@hadoop9:55042/user/taskmanager_0) at ResourceManager
>  
>  
> 2019-04-03 13:48:51,132 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Received new container: 
> container_1553017480503_0158_01_77 - Remaining pending container 
> requests: 6
> 2019-04-03 13:48:52,862 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner    
>               -     
> -Dlog.file=/hadoop/yarn/log/application_1553017480503_0158/container_1553017480503_0158_01_77/taskmanager.log
> 2019-04-03 13:48:57,490 INFO  
> org.apache.flink.runtime.io.network.netty.NettyServer         - Successful 
> initialization (took 202 ms). Listening on SocketAddress 
> /192.168.230.69:40140.
> 2019-04-03 13:49:12,575 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Registering TaskManager with ResourceID 
> container_1553017480503_0158_01_77 
> (akka.tcp://flink@hadoop9:51525/user/taskmanager_0) at ResourceManager
> 2019-04-03 13:49:12,631 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Allocated 
> slot for AllocationID\{42fed3e5a136240c23cc7b394e3249e9}.
> 2019-04-03 14:58:15,188 INFO  
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - 
> Un-registering task and sending final execution state FINISHED to JobManager 
> for task DataSink 
> (com.anovadata.alexflinklib.sinks.bucketing.BucketingOutputFormat@26874f2c) 
> a4b5fb32830d4561147b2714828109e2.
> 2019-04-03 14:59:23,049 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> idle slot [AllocationID\{42fed3e5a136240c23cc7b394e3249e9}].
> 2019-04-03 14:59:23,058 INFO  
> org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable      - Free slot 
> TaskSlot(index:0, state:ACTIVE, resource profile: 
> ResourceProfile\{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
> directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
> networkMemoryInMB=2147483647}, allocationId: 
> AllocationID\{42fed3e5a136240c23cc7b394e3249e9}, jobId: 
> a6c4e367698c15cdf168d19a89faff1d).
> 2019-04-03 15:00:02,641 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Stopping container container_1553017480503_0158_01_77.
> 2019-04-03 15:00:02,646 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Closing TaskExecutor connection 
> container_1553017480503_0158_01_77 because: TaskExecutor exceeded the 
> idle timeout.
>  
>  
> 2019-04-03 13:48:48,902 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner    
>               -     
> -Dlog.file=/data1/hadoop/yarn/log/application_1553017480503_0158/container_1553017480503_0158_01_59/taskmanager.log
> 2019-04-03 14:59:24,677 INFO  
> org.apache.parquet.hadoop.InternalParquetRecordWriter         - Flushing mem 
> columnStore to file. allocated memory: 109479981
> 2019-04-0

[jira] [Updated] (FLINK-12123) Upgrade Jepsen to 0.1.13 in flink-jepsen

2019-04-08 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-12123:
-
Fix Version/s: (was: 1.8.0)
   1.9.0

> Upgrade Jepsen to 0.1.13 in flink-jepsen
> 
>
> Key: FLINK-12123
> URL: https://issues.apache.org/jira/browse/FLINK-12123
> Project: Flink
>  Issue Type: Improvement
>  Components: Test Infrastructure
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> Raise version of the jepsen dependency in {{flink-jepsen/project.clj}} to 
> 0.1.13.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-12123) Upgrade Jepsen to 0.1.13 in flink-jepsen

2019-04-08 Thread Gary Yao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-12123:
-
Description: Raise version of the jepsen dependency in 
{{flink-jepsen/project.clj}} to 0.1.13.  (was: Raise version of the jepsen 
dependency in {{flink-jepsen/project.clj}} to 0.1.11.)

> Upgrade Jepsen to 0.1.13 in flink-jepsen
> 
>
> Key: FLINK-12123
> URL: https://issues.apache.org/jira/browse/FLINK-12123
> Project: Flink
>  Issue Type: Improvement
>  Components: Test Infrastructure
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Raise version of the jepsen dependency in {{flink-jepsen/project.clj}} to 
> 0.1.13.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12123) Upgrade Jepsen to 0.1.13 in flink-jepsen

2019-04-08 Thread Gary Yao (JIRA)
Gary Yao created FLINK-12123:


 Summary: Upgrade Jepsen to 0.1.13 in flink-jepsen
 Key: FLINK-12123
 URL: https://issues.apache.org/jira/browse/FLINK-12123
 Project: Flink
  Issue Type: Improvement
  Components: Test Infrastructure
Reporter: Gary Yao
Assignee: Gary Yao
 Fix For: 1.8.0


Raise version of the jepsen dependency in {{flink-jepsen/project.clj}} to 
0.1.11.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272965384
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/AbstractUpdateRankFunction.java
 ##
 @@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.rank;
+
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedRecordComparator;
+import org.apache.flink.table.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.util.LRUMap;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+
+/**
+ * Base class for Update Rank Function.
+ */
+abstract class AbstractUpdateRankFunction extends AbstractRankFunction
 
 Review comment:
   There is only one sub class for this abstract class, i don't think it's 
necessary to abstract it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #8062: [FLINK-11884][table] Implement expression resolution on top of new Expressions

2019-04-08 Thread GitBox
twalthr commented on a change in pull request #8062:  [FLINK-11884][table] 
Implement expression resolution on top of new Expressions
URL: https://github.com/apache/flink/pull/8062#discussion_r272964598
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -78,12 +83,12 @@ public static SymbolExpression symbol(TableSymbol symbol) {
return new SymbolExpression(symbol);
}
 
-   public static UnresolvedFieldReferenceExpression 
unresolvedFieldRef(String name) {
-   return new UnresolvedFieldReferenceExpression(name);
+   public static UnresolvedReferenceExpression unresolvedFieldRef(String 
name) {
+   return new UnresolvedReferenceExpression(name);
}
 
public static TableReferenceExpression tableRef(String name, Table 
table) {
 
 Review comment:
   True, registering the table under the hood is not perfect design.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272964014
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/rank/SortedMap.java
 ##
 @@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.rank;
+
+import org.apache.flink.table.dataformat.BaseRow;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.Supplier;
+
+/**
+ * SortedMap stores mapping from sort key to records list, each record is 
BaseRow type.
+ * SortedMap could also track rank number of each records.
+ *
+ * @param  Type of the sort key
+ */
+public class SortedMap {
 
 Review comment:
   It's better to use a different name since this conflicting with JDK's 
`SortedMap`. And we have SortedMapTypeInfo and serializer, it's hard to tell 
the typeinfo and serializer are prepared for which `SortedMap` if we don't look 
into code details.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


issues@flink.apache.org

2019-04-08 Thread Lifei Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812283#comment-16812283
 ] 

Lifei Chen commented on FLINK-7464:
---

Hi,  [~sunjincheng121],  I think `FIRST_VALUE` and `LAST_VALUE` are very useful 
and I can also find the two build-in aggregate functions docs of 
[aliyun|[https://help.aliyun.com/knowledge_detail/62791.html]].

But the subtask is closed with status unsolved.  is there any reasons why the 
progress of the feature is stopped ? 



Thanks !

> Add useful build-in Aggregate function into TabalAPI&SQL
> 
>
> Key: FLINK-7464
> URL: https://issues.apache.org/jira/browse/FLINK-7464
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API
>Reporter: sunjincheng
>Priority: Major
>
> In this JIRA, will create some sub-task for add specific build-in aggregate 
> function, such as FIRST_VALUE, LAST_VALUE, BloomFilterCount etc.
> Welcome anybody to add the sub-task.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12122?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-12122:
-

Assignee: Till Rohrmann

> Spread out tasks evenly across all available registered TaskManagers
> 
>
> Key: FLINK-12122
> URL: https://issues.apache.org/jira/browse/FLINK-12122
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Affects Versions: 1.6.4, 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
> Fix For: 1.7.3, 1.9.0, 1.8.1
>
>
> With Flip-6, we changed the default behaviour how slots are assigned to 
> {{TaskManages}}. Instead of evenly spreading it out over all registered 
> {{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a 
> tendency to first fill up a TM before using another one. This is a regression 
> wrt the pre Flip-6 code.
> I suggest to change the behaviour so that we try to evenly distribute slots 
> across all available {{TaskManagers}} by considering how many of their slots 
> are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12122) Spread out tasks evenly across all available registered TaskManagers

2019-04-08 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-12122:
-

 Summary: Spread out tasks evenly across all available registered 
TaskManagers
 Key: FLINK-12122
 URL: https://issues.apache.org/jira/browse/FLINK-12122
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.7.2, 1.6.4, 1.8.0
Reporter: Till Rohrmann
 Fix For: 1.7.3, 1.9.0, 1.8.1


With Flip-6, we changed the default behaviour how slots are assigned to 
{{TaskManages}}. Instead of evenly spreading it out over all registered 
{{TaskManagers}}, we randomly pick slots from {{TaskManagers}} with a tendency 
to first fill up a TM before using another one. This is a regression wrt the 
pre Flip-6 code.

I suggest to change the behaviour so that we try to evenly distribute slots 
across all available {{TaskManagers}} by considering how many of their slots 
are already allocated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-10437) Some of keys under withDeprecatedKeys aren't marked as @depreacted

2019-04-08 Thread Ji Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812271#comment-16812271
 ] 

Ji Liu edited comment on FLINK-10437 at 4/8/19 9:34 AM:


[~Tison] I have checked usages of _ConfigOption.withDeparecatedKeys_ and find 
only the case mentioned above meet the requirements. Could you please help to 
review this PR? thanks very much! 
[PR|[https://github.com/apache/flink/pull/8121]]


was (Author: tianchen92):
[~Tison] I have checked usages of _ConfigOption.withDeparecatedKeys_ and find 
only the case mentioned above meet the requirements. Could please help review 
this PR? thanks very much! [PR|[https://github.com/apache/flink/pull/8121]]

> Some of keys under withDeprecatedKeys aren't marked as @depreacted
> --
>
> Key: FLINK-10437
> URL: https://issues.apache.org/jira/browse/FLINK-10437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> as title. For example {{RestOptions#BIND_ADDRESS}} is 
> {{withDeprecatedKeys(WebOptions.ADDRESS.key())}}, but {{WebOptions.ADDRESS}} 
> isn't marked as deprecated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10437) Some of keys under withDeprecatedKeys aren't marked as @depreacted

2019-04-08 Thread Ji Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812271#comment-16812271
 ] 

Ji Liu commented on FLINK-10437:


[~Tison] I have checked usages of _ConfigOption.withDeparecatedKeys_ and find 
only the case mentioned above meet the requirements. Could please help review 
this PR? thanks very much! [PR|[https://github.com/apache/flink/pull/8121]]

> Some of keys under withDeprecatedKeys aren't marked as @depreacted
> --
>
> Key: FLINK-10437
> URL: https://issues.apache.org/jira/browse/FLINK-10437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> as title. For example {{RestOptions#BIND_ADDRESS}} is 
> {{withDeprecatedKeys(WebOptions.ADDRESS.key())}}, but {{WebOptions.ADDRESS}} 
> isn't marked as deprecated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10984) Move flink-shaded-hadoop to flink-shaded

2019-04-08 Thread sunjincheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812268#comment-16812268
 ] 

sunjincheng commented on FLINK-10984:
-

+1 for this proposal.
I think this JIRA should first add the `flink-shaded-hadoop` module to the 
`flink-shaded`, then remove the `flink-shaded-hadoop2` from the flink, so I 
opened the first PR for add the `flink-shaded-hadoop` module to the 
`flink-shaded`. [https://github.com/apache/flink-shaded/pull/58] I appreciate 
if you can check if this PR is reasonable for you. [~Zentol]

> Move flink-shaded-hadoop to flink-shaded
> 
>
> Key: FLINK-10984
> URL: https://issues.apache.org/jira/browse/FLINK-10984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, BuildSystem / Shaded
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> To allow reasonable dependency management we should move flink-shaded-hadoop 
> to flink-shaded, with each supported version having it's own module and 
> dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-08 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r272951340
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -160,7 +149,7 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
}
reporters.add(reporterInstance);
 
-   String delimiterForReporter = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, 
String.valueOf(globalDelimiter));
+   String delimiterForReporter = 
metricConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, 
String.valueOf(globalDelimiter));
 
 Review comment:
   Same here, we could have a method `MetricConfig#getReporterScopeDelimiter()`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12008) Support read a whole directory or multiple input data files for read apis of HadoopInputs

2019-04-08 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812265#comment-16812265
 ] 

Fabian Hueske commented on FLINK-12008:
---

Yes, IIRC we recently added support for multiple paths to Flink's 
{{FileInputFormat}} as well.

> Support read a whole directory or multiple input data files for read apis of 
> HadoopInputs
> -
>
> Key: FLINK-12008
> URL: https://issues.apache.org/jira/browse/FLINK-12008
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Hadoop Compatibility
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>
> Currently, the read APIs provided by {{HadoopInputs}} only can read one path. 
> I think it's not strong enough. We should support read a whole directory or 
> multiple input files.
> Hadoop provides {{org.apache.hadoop.mapred.FileInputFormat.setInputPaths()}} 
> to support this requirement. 
> Spark's {{sequenceFile}} API calls this 
> API([https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1049].)
> Flink calls {{org.apache.hadoop.mapred.FileInputFormat.addInputPath}} which 
> only  supports one path.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-08 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r272955876
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
 ##
 @@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link ReporterSetup}.
+ */
+public class ReporterSetupTest extends TestLogger {
+
+   /**
+* TestReporter1 class only for type differentiation.
+*/
+   static class TestReporter1 extends TestReporter {
+   }
+
+   /**
+* TestReporter2 class only for type differentiation.
+*/
+   static class TestReporter2 extends TestReporter {
+   }
+
+   /**
+* Verifies that a reporter can be configured with all it's arguments 
being forwarded.
+*/
+   @Test
+   public void testReporterArgumentForwarding() {
+   final Configuration config = new Configuration();
+
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
ReporterSetupTest.TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter.arg1", "value1");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter.arg2", "value2");
+
+   final List reporterSetups = 
ReporterSetup.fromConfiguration(config);
+
+   Assert.assertEquals(1, reporterSetups.size());
+
+   final ReporterSetup reporterSetup = reporterSetups.get(0);
+   Assert.assertEquals("reporter", reporterSetup.getName());
+   Assert.assertEquals("value1", 
reporterSetup.getConfiguration().getString("arg1", null));
+   Assert.assertEquals("value2", 
reporterSetup.getConfiguration().getString("arg2", null));
+   
Assert.assertEquals(ReporterSetupTest.TestReporter1.class.getName(), 
reporterSetup.getConfiguration().getString("class", null));
+   }
+
+   /**
+* Verifies that multiple reporters can be configured with all their 
arguments being forwarded.
+*/
+   @Test
+   public void testSeveralReportersWithArgumentForwarding() {
 
 Review comment:
   Shouldn't this test subsume `testReporterArgumentForwarding`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-08 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r272954758
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
 
 Review comment:
   Just a thought, instead of having the `ReporterSetup` we could extend the 
`MetricReporter` interface to give us `MetricReporterWithNameAndScopeDelimiter` 
which has a method `getName` and `getScopeDelimiter`. Behind the scenes it 
could still use the `MetricConfig`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10984) Move flink-shaded-hadoop to flink-shaded

2019-04-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10984:
---
Labels: pull-request-available  (was: )

> Move flink-shaded-hadoop to flink-shaded
> 
>
> Key: FLINK-10984
> URL: https://issues.apache.org/jira/browse/FLINK-10984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, BuildSystem / Shaded
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> To allow reasonable dependency management we should move flink-shaded-hadoop 
> to flink-shaded, with each supported version having it's own module and 
> dependency management.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-08 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r272951210
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -113,19 +114,12 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config) {
// by default, don't report anything
LOG.info("No metrics reporter configured, no metrics 
will be exposed/reported.");
} else {
-   // we have some reporters so
-   for (Tuple2 
reporterConfiguration: reporterConfigurations) {
-   String namedReporter = reporterConfiguration.f0;
-   Configuration reporterConfig = 
reporterConfiguration.f1;
-
-   final String className = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, null);
-   if (className == null) {
-   LOG.error("No reporter class set for 
reporter " + namedReporter + ". Metrics might not be exposed/reported.");
-   continue;
-   }
+   for (ReporterSetup reporterSetup : 
reporterConfigurations) {
+   final String namedReporter = 
reporterSetup.getName();
+   final MetricConfig metricConfig = 
reporterSetup.getConfiguration();
 
try {
-   String configuredPeriod = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
null);
+   String configuredPeriod = 
metricConfig.getString(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, null);
 
 Review comment:
   Should we hide the configuration field accesses behind 
`MetricConfig#getReporterInterval()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-08 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r272956271
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java
 ##
 @@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Optional;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link ReporterSetup}.
+ */
+public class ReporterSetupTest extends TestLogger {
+
+   /**
+* TestReporter1 class only for type differentiation.
+*/
+   static class TestReporter1 extends TestReporter {
+   }
+
+   /**
+* TestReporter2 class only for type differentiation.
+*/
+   static class TestReporter2 extends TestReporter {
+   }
+
+   /**
+* Verifies that a reporter can be configured with all it's arguments 
being forwarded.
+*/
+   @Test
+   public void testReporterArgumentForwarding() {
+   final Configuration config = new Configuration();
+
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
ReporterSetupTest.TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter.arg1", "value1");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter.arg2", "value2");
+
+   final List reporterSetups = 
ReporterSetup.fromConfiguration(config);
+
+   Assert.assertEquals(1, reporterSetups.size());
+
+   final ReporterSetup reporterSetup = reporterSetups.get(0);
+   Assert.assertEquals("reporter", reporterSetup.getName());
+   Assert.assertEquals("value1", 
reporterSetup.getConfiguration().getString("arg1", null));
+   Assert.assertEquals("value2", 
reporterSetup.getConfiguration().getString("arg2", null));
+   
Assert.assertEquals(ReporterSetupTest.TestReporter1.class.getName(), 
reporterSetup.getConfiguration().getString("class", null));
+   }
+
+   /**
+* Verifies that multiple reporters can be configured with all their 
arguments being forwarded.
+*/
+   @Test
+   public void testSeveralReportersWithArgumentForwarding() {
+   final Configuration config = new Configuration();
+
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
ReporterSetupTest.TestReporter1.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter1.arg1", "value1");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter1.arg2", "value2");
+
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
ReporterSetupTest.TestReporter2.class.getName());
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter2.arg1", "value1");
+   config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"reporter2.arg3", "value3");
+
+   final List reporterSetups = 
ReporterSetup.fromConfiguration(config);
+
+   Assert.assertEquals(2, reporterSetups.size());
+
+   final Optional reporter1Config = 
reporterSetups.stream()
+   .filter(c -> "reporter1".equals(c.getName()))
+   .findFirst();
+   Assert.

[GitHub] [flink] tillrohrmann commented on a change in pull request #8002: [FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters Suppliers

2019-04-08 Thread GitBox
tillrohrmann commented on a change in pull request #8002: 
[FLINK-11923][metrics] MetricRegistryConfiguration provides MetricReporters 
Suppliers
URL: https://github.com/apache/flink/pull/8002#discussion_r272950654
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
 ##
 @@ -87,10 +86,14 @@
 
private boolean isShutdown;
 
+   public MetricRegistryImpl(MetricRegistryConfiguration config) {
+   this(config, Collections.emptyList());
+   }
+
/**
 * Creates a new MetricRegistry and starts the configured reporter.
 */
-   public MetricRegistryImpl(MetricRegistryConfiguration config) {
+   public MetricRegistryImpl(MetricRegistryConfiguration config, 
List reporterConfigurations) {
 
 Review comment:
   Could this be a `Collection`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #8121: [FLINK-10437]Some of keys under withDeprecatedKeys aren't marked as @…

2019-04-08 Thread GitBox
flinkbot commented on issue #8121: [FLINK-10437]Some of keys under 
withDeprecatedKeys aren't marked as @…
URL: https://github.com/apache/flink/pull/8121#issuecomment-480754474
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tianchen92 opened a new pull request #8121: [FLINK-10437]Some of keys under withDeprecatedKeys aren't marked as @…

2019-04-08 Thread GitBox
tianchen92 opened a new pull request #8121: [FLINK-10437]Some of keys under 
withDeprecatedKeys aren't marked as @…
URL: https://github.com/apache/flink/pull/8121
 
 
   …depreacted
   
   
   
   ## What is the purpose of the change
   
   *RestOptions#BIND_ADDRESS is withDeprecatedKeys(WebOptions.ADDRESS.key()), 
but WebOptions.ADDRESS isn't marked as deprecated. In this case replace 
withDeprecatedKeys with withFallbackKeys.*
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable )
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10437) Some of keys under withDeprecatedKeys aren't marked as @depreacted

2019-04-08 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10437:
---
Labels: pull-request-available  (was: )

> Some of keys under withDeprecatedKeys aren't marked as @depreacted
> --
>
> Key: FLINK-10437
> URL: https://issues.apache.org/jira/browse/FLINK-10437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
>  Labels: pull-request-available
>
> as title. For example {{RestOptions#BIND_ADDRESS}} is 
> {{withDeprecatedKeys(WebOptions.ADDRESS.key())}}, but {{WebOptions.ADDRESS}} 
> isn't marked as deprecated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] tzulitai commented on a change in pull request #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-08 Thread GitBox
tzulitai commented on a change in pull request #8078: [FLINK-12066] [State 
Backends] Remove StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078#discussion_r272922134
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapInternalKeyContext.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * The {@link InternalKeyContext} for heap keyed backend.
+ *
+ * @param  Type of the key.
+ */
+public class HeapInternalKeyContext implements InternalKeyContext {
+   private final KeyGroupRange keyGroupRange;
+   private final int numberOfKeyGroups;
+
+   private K currentKey;
+   private int currentKeyGroupIndex;
+   private TypeSerializer keySerializer;
+
+   HeapInternalKeyContext(KeyGroupRange keyGroupRange, int 
numberOfKeyGroups, TypeSerializer keySerializer) {
+   this.keyGroupRange = keyGroupRange;
+   this.numberOfKeyGroups = numberOfKeyGroups;
+   this.keySerializer = keySerializer;
+   }
+
+   @Override
+   public K getCurrentKey() {
+   return currentKey;
+   }
+
+   @Override
+   public int getCurrentKeyGroupIndex() {
+   return currentKeyGroupIndex;
+   }
+
+   @Override
+   public int getNumberOfKeyGroups() {
+   return numberOfKeyGroups;
+   }
+
+   @Override
+   public KeyGroupRange getKeyGroupRange() {
+   return keyGroupRange;
+   }
+
+   @Override
+   public TypeSerializer getKeySerializer() {
+   return keySerializer;
+   }
+
+   public void setCurrentKey(K currentKey) {
+   this.currentKey = currentKey;
+   }
+
+   void setCurrentKeyGroupIndex(int currentKeyGroupIndex) {
+   this.currentKeyGroupIndex = currentKeyGroupIndex;
+   }
+
+   public void setKeySerializer(TypeSerializer keySerializer) {
+   this.keySerializer = keySerializer;
 
 Review comment:
   Same here, some sanity checks for the setter methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-08 Thread GitBox
tzulitai commented on a change in pull request #8078: [FLINK-12066] [State 
Backends] Remove StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078#discussion_r272921407
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotRestore.java
 ##
 @@ -27,7 +28,7 @@
  * TODO find better name?
  */
 @Internal
-public interface StateSnapshotRestore {
+public interface StateSnapshotRestore {
 
 Review comment:
   Add `K` parameter description to the Javadocs


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on a change in pull request #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-08 Thread GitBox
tzulitai commented on a change in pull request #8078: [FLINK-12066] [State 
Backends] Remove StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078#discussion_r272922020
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapInternalKeyContext.java
 ##
 @@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.heap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.KeyGroupRange;
+
+/**
+ * The {@link InternalKeyContext} for heap keyed backend.
+ *
+ * @param  Type of the key.
+ */
+public class HeapInternalKeyContext implements InternalKeyContext {
+   private final KeyGroupRange keyGroupRange;
+   private final int numberOfKeyGroups;
+
+   private K currentKey;
+   private int currentKeyGroupIndex;
+   private TypeSerializer keySerializer;
+
+   HeapInternalKeyContext(KeyGroupRange keyGroupRange, int 
numberOfKeyGroups, TypeSerializer keySerializer) {
+   this.keyGroupRange = keyGroupRange;
+   this.numberOfKeyGroups = numberOfKeyGroups;
+   this.keySerializer = keySerializer;
 
 Review comment:
   Maybe some sanity checks (e.g. non-null, non-negative) will make sense here


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272944389
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
+
+   return new NetworkEnvironmentConfiguration(
+   numNetworkBuffers,
+   pageSize,
+   initialRequestBackoff,
+   maxRequestBackoff,
+   buffersPerChannel,
+   extraBuffersPerGate,
+   isCreditBased,
+   nettyConfig);
+   }
+
+   /**
+* C

[jira] [Commented] (FLINK-12121) Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread Yu Li (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-12121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812243#comment-16812243
 ] 

Yu Li commented on FLINK-12121:
---

Mark as a blocker of FLINK-12066

> Use composition instead of inheritance for the InternalKeyContext logic in 
> backend
> --
>
> Key: FLINK-12121
> URL: https://issues.apache.org/jira/browse/FLINK-12121
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yu Li
>Assignee: Yu Li
>Priority: Major
>
> Commonly it's 
> [recommended|https://stackoverflow.com/questions/2399544/difference-between-inheritance-and-composition]
>  to favor composition over inheritance in java design, but currently in keyed 
> backend we're using inheritance for the {{InternalKeyContext}} logic, and 
> here we propose to change to the composition way.
> Another advantage of changing to the composition way is that we could remove 
> the requirement of a heap backend instance when constructing 
> {{HeapRestoreOperation}}, and further making sure all fields are final when 
> constructing the {{HeapKeyedStateBackend}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12121) Use composition instead of inheritance for the InternalKeyContext logic in backend

2019-04-08 Thread Yu Li (JIRA)
Yu Li created FLINK-12121:
-

 Summary: Use composition instead of inheritance for the 
InternalKeyContext logic in backend
 Key: FLINK-12121
 URL: https://issues.apache.org/jira/browse/FLINK-12121
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Yu Li
Assignee: Yu Li


Commonly it's 
[recommended|https://stackoverflow.com/questions/2399544/difference-between-inheritance-and-composition]
 to favor composition over inheritance in java design, but currently in keyed 
backend we're using inheritance for the {{InternalKeyContext}} logic, and here 
we propose to change to the composition way.

Another advantage of changing to the composition way is that we could remove 
the requirement of a heap backend instance when constructing 
{{HeapRestoreOperation}}, and further making sure all fields are final when 
constructing the {{HeapKeyedStateBackend}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on issue #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on issue #8109: [FLINK-12017][table-planner-blink] Support 
translation from Rank/Deduplicate to StreamTransformation
URL: https://github.com/apache/flink/pull/8109#issuecomment-480743717
 
 
   BTW, i think we need some dedicated tests for all rank functions. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10437) Some of keys under withDeprecatedKeys aren't marked as @depreacted

2019-04-08 Thread TisonKun (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812233#comment-16812233
 ] 

TisonKun commented on FLINK-10437:
--

Yes. You might decide whether replace with {{withFallbackKeys}} or mark it as 
{{@Deprecated}}

> Some of keys under withDeprecatedKeys aren't marked as @depreacted
> --
>
> Key: FLINK-10437
> URL: https://issues.apache.org/jira/browse/FLINK-10437
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 1.7.0
>Reporter: TisonKun
>Priority: Major
>
> as title. For example {{RestOptions#BIND_ADDRESS}} is 
> {{withDeprecatedKeys(WebOptions.ADDRESS.key())}}, but {{WebOptions.ADDRESS}} 
> isn't marked as deprecated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272878173
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/EqualiserCodeGenerator.scala
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.dataformat.{BaseRow, BinaryRow}
+import org.apache.flink.table.generated.{GeneratedRecordEqualiser, 
RecordEqualiser}
+import org.apache.flink.table.`type`.{DateType, InternalType, PrimitiveType, 
RowType, TimeType, TimestampType}
+
+class EqualiserCodeGenerator(fieldTypes: Seq[InternalType]) {
+
+  private val BASE_ROW = className[BaseRow]
 
 Review comment:
   `CodeGenUtils` already has this and `BINARY_ROW`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272880935
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionBase.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.RecordEqualiser;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class to deduplicate on keys and keeps only first row or last row.
+ */
+public interface DeduplicateFunctionBase {
+
+   default void processLastRow(BaseRow preRow, BaseRow currentRow, boolean 
generateRetraction,
+   boolean stateCleaningEnabled, ValueState 
pkRow, RecordEqualiser equaliser,
+   Collector out) throws Exception {
+   // should be accumulate msg.
+   
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+   // ignore same record
+   if (!stateCleaningEnabled && preRow != null &&
+   equaliser.equalsWithoutHeader(preRow, currentRow)) {
+   return;
+   }
+   pkRow.update(currentRow);
+   if (preRow != null && generateRetraction) {
+   preRow.setHeader(BaseRowUtil.RETRACT_MSG);
+   out.collect(preRow);
+   }
+   out.collect(currentRow);
+   }
+
+   default void processFirstRow(BaseRow preRow, BaseRow currentRow, 
boolean generateRetraction,
+   boolean stateCleaningEnabled, ValueState 
pkRow, RecordEqualiser equaliser,
+   Collector out) throws Exception {
+   // should be accumulate msg.
+   
Preconditions.checkArgument(BaseRowUtil.isAccumulateMsg(currentRow));
+   // ignore record with timestamp bigger than preRow
+   if (!isFirstRow(preRow)) {
+   return;
+   }
+
+   pkRow.update(currentRow);
+   out.collect(currentRow);
+   }
+
+   default boolean isFirstRow(BaseRow preRow) {
 
 Review comment:
   This is confusing with FirstRow/LastRow


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272879137
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
 ##
 @@ -73,4 +92,92 @@ class StreamExecDeduplicate(
   .item("order", orderString)
   }
 
+  //~ ExecNode methods 
---
+
+  override protected def translateToPlanInternal(
+  tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+
+// TODO checkInput is not acc retract after FLINK- is done
+val inputIsAccRetract = false
+
+if (inputIsAccRetract) {
+  throw new TableException(
+"Deduplicate: Retraction on Deduplicate is not supported yet.\n" +
+  "please re-check sql grammar. \n" +
+  "Note: Deduplicate should not follow a non-windowed GroupBy 
aggregation.")
+}
+
+val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+ .asInstanceOf[StreamTransformation[BaseRow]]
+
+val rowTypeInfo = 
inputTransform.getOutputType.asInstanceOf[BaseRowTypeInfo]
+
+val generateRetraction = true
 
 Review comment:
   I think keep first row will not generate retraction?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272878814
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
 ##
 @@ -73,4 +92,92 @@ class StreamExecDeduplicate(
   .item("order", orderString)
   }
 
+  //~ ExecNode methods 
---
+
+  override protected def translateToPlanInternal(
+  tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+
+// TODO checkInput is not acc retract after FLINK- is done
 
 Review comment:
   FLINK-?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272879175
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecExchange.scala
 ##
 @@ -50,4 +65,41 @@ class StreamExecExchange(
   newDistribution: RelDistribution): StreamExecExchange = {
 new StreamExecExchange(cluster, traitSet, newInput, newDistribution)
   }
+
+  //~ ExecNode methods 
---
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+List(getInput.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override protected def translateToPlanInternal(
+  tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+ .asInstanceOf[StreamTransformation[BaseRow]]
 
 Review comment:
   proper indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272878861
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecDeduplicate.scala
 ##
 @@ -73,4 +92,92 @@ class StreamExecDeduplicate(
   .item("order", orderString)
   }
 
+  //~ ExecNode methods 
---
+
+  override protected def translateToPlanInternal(
+  tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+
+// TODO checkInput is not acc retract after FLINK- is done
+val inputIsAccRetract = false
+
+if (inputIsAccRetract) {
+  throw new TableException(
+"Deduplicate: Retraction on Deduplicate is not supported yet.\n" +
+  "please re-check sql grammar. \n" +
+  "Note: Deduplicate should not follow a non-windowed GroupBy 
aggregation.")
+}
+
+val inputTransform = getInputNodes.get(0).translateToPlan(tableEnv)
+ .asInstanceOf[StreamTransformation[BaseRow]]
 
 Review comment:
   proper indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272882131
 
 

 ##
 File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/deduplicate/DeduplicateFunctionBase.java
 ##
 @@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.deduplicate;
+
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.util.BaseRowUtil;
+import org.apache.flink.table.generated.RecordEqualiser;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class to deduplicate on keys and keeps only first row or last row.
+ */
+public interface DeduplicateFunctionBase {
 
 Review comment:
   it's not acting like an interface


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KurtYoung commented on a change in pull request #8109: [FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to StreamTransformation

2019-04-08 Thread GitBox
KurtYoung commented on a change in pull request #8109: 
[FLINK-12017][table-planner-blink] Support translation from Rank/Deduplicate to 
StreamTransformation
URL: https://github.com/apache/flink/pull/8109#discussion_r272878296
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/EqualiserCodeGenerator.scala
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.codegen.CodeGenUtils._
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.dataformat.{BaseRow, BinaryRow}
+import org.apache.flink.table.generated.{GeneratedRecordEqualiser, 
RecordEqualiser}
+import org.apache.flink.table.`type`.{DateType, InternalType, PrimitiveType, 
RowType, TimeType, TimestampType}
+
+class EqualiserCodeGenerator(fieldTypes: Seq[InternalType]) {
+
+  private val BASE_ROW = className[BaseRow]
+  private val BINARY_ROW = className[BinaryRow]
+  private val RECORD_EQUALISER = className[RecordEqualiser]
+  private val LEFT_INPUT = "left"
+  private val RIGHT_INPUT = "right"
+
+  def generateRecordEqualiser(name: String): GeneratedRecordEqualiser = {
+// ignore time zone
+val ctx = CodeGeneratorContext(new TableConfig)
+val className = newName(name)
+val header =
+  s"""
+ |if ($LEFT_INPUT.getHeader() != $RIGHT_INPUT.getHeader()) {
+ |  return false;
+ |}
+   """.stripMargin
+
+val codes = for (i <- fieldTypes.indices) yield {
+  val fieldType = fieldTypes(i)
+  val fieldTypeTerm = primitiveTypeTermForType(fieldType)
+  val result = s"cmp$i"
+  val leftNullTerm = "leftIsNull$" + i
+  val rightNullTerm = "rightIsNull$" + i
+  val leftFieldTerm = "leftField$" + i
+  val rightFieldTerm = "rightField$" + i
+  val equalsCode = if (isInternalPrimitive(fieldType)) {
+s"$leftFieldTerm == $rightFieldTerm"
+  } else if (isBaseRow(fieldType)) {
+val equaliserGenerator =
+  new 
EqualiserCodeGenerator(fieldType.asInstanceOf[RowType].getFieldTypes)
+val generatedEqualiser = equaliserGenerator
+ .generateRecordEqualiser("field$" + i + 
"GeneratedEqualiser")
 
 Review comment:
   proper indent


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272922309
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginDescriptorsFactory.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to create a collection of {@link PluginDescriptor} based 
on directory structure for a given plugin
+ * root folder.
+ *
+ * The expected structure is as follows: the given plugins root folder, 
containing the plugins folder. One plugin folder
+ * contains all resources (jar files) belonging to a plugin. The name of the 
plugin folder becomes the plugin id.
+ * 
+ * plugins-root-folder/
+ *  |plugin-a/ (folder of plugin a)
+ *  ||-plugin-a-1.jar (the jars containing the 
classes of plugin a)
+ *  ||-plugin-a-2.jar
+ *  ||-...
+ *  |
+ *  |plugin-b/
+ *  ||-plugin-b-1.jar
+ * ...   |-...
+ * 
+ */
+public class DirectoryBasedPluginDescriptorsFactory {
+
+   private static final Logger log = 
LoggerFactory.getLogger(DirectoryBasedPluginDescriptorsFactory.class);
+
+   private static final PathMatcher JAR_FILE_MATCHER =
+   FileSystems.getDefault().getPathMatcher("glob:**.jar");
+
+   private final Path pluginsRootDir;
+
+   public DirectoryBasedPluginDescriptorsFactory(Path pluginsRootDir) {
+   this.pluginsRootDir = pluginsRootDir;
+   }
+
+   //TODO this can go into an interface if there would be more strategies 
in the future.
+   public Collection createPluginDescriptors() throws 
IOException {
+   return Files.list(pluginsRootDir)
+   .filter((Path path) -> Files.isDirectory(path))
+   
.map(FunctionUtils.uncheckedFunction(this::createPluginDescriptorForSubDirectory))
+   .filter(Optional::isPresent)
+   .map(Optional::get)
+   .collect(Collectors.toList());
+   }
+
+   private Optional 
createPluginDescriptorForSubDirectory(Path subDirectory) {
+   Optional jarURLsFromDirectory = 
createJarURLsFromDirectory(subDirectory);
+   if (jarURLsFromDirectory.isPresent()) {
 
 Review comment:
   nit: inverse if/else conditions or maybe use `Optional#map` method 
(`jarURLsFromDirectory.map(...)`)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-4785) Flink string parser doesn't handle string fields containing two consecutive double quotes

2019-04-08 Thread Flavio Pompermaier (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1681#comment-1681
 ] 

Flavio Pompermaier commented on FLINK-4785:
---

Which url is no longer valid?
[https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/Csv2RowExample.java]
 ?

I can still access it..and I don't know whether this problem has been solved or 
not..but I think it's not (you should try to run that main class)

> Flink string parser doesn't handle string fields containing two consecutive 
> double quotes
> -
>
> Key: FLINK-4785
> URL: https://issues.apache.org/jira/browse/FLINK-4785
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Affects Versions: 1.1.2
>Reporter: Flavio Pompermaier
>Priority: Major
>  Labels: csv
>
> To reproduce the error run 
> https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/flink/Csv2RowExample.java



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272927782
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/core/plugin/DirectoryBasedPluginDescriptorsFactoryTest.java
 ##
 @@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Test for {@link DirectoryBasedPluginDescriptorsFactory}.
+ */
+public class DirectoryBasedPluginDescriptorsFactoryTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   @Test
+   public void createPluginDescriptorsForDirectory() throws Exception {
+   File rootFolder = temporaryFolder.newFolder();
+   DirectoryBasedPluginDescriptorsFactory descriptorsFactory =
+   new 
DirectoryBasedPluginDescriptorsFactory(rootFolder.toPath());
+   Collection actual = 
descriptorsFactory.createPluginDescriptors();
+
+   // empty root dir -> no actual
 
 Review comment:
   nit: move those comments to the assertion message?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272925609
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginLoader.java
 ##
 @@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.ChildFirstClassLoader;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * A {@link PluginLoader} is used by the {@link PluginManager} to load a 
single plugin. It is essentially a combination
+ * of a {@link ChildFirstClassLoader} and {@link ServiceLoader}. This class 
can locate and load service implementations
+ * from the plugin for a given SPI. The {@link PluginDescriptor}, which among 
other information contains the resource
+ * URLs, is provided at construction.
+ */
+@ThreadSafe
+public class PluginLoader {
+
+   /** Classloader which is used to load the plugin classes. We expect 
this classloader is thread-safe.*/
+   private final ClassLoader pluginClassLoader;
+
+   @VisibleForTesting
+   public PluginLoader(PluginDescriptor pluginDescriptor, ClassLoader 
parentClassLoader) {
+   this.pluginClassLoader =
+   new ChildFirstClassLoader(
+   pluginDescriptor.getPluginResourceURLs(),
+   parentClassLoader,
+   pluginDescriptor.getLoaderExcludePatterns());
+   }
+
+   /**
+* Returns in iterator over all available implementations of the given 
service interface (SPI) for the plugin.
+*
+* @param service the service interface (SPI) for which implementations 
are requested.
+* @param  Type of the requested plugin service.
+* @return An iterator of all implementations of the given service 
interface that could be loaded from the plugin.
+*/
+   public  Iterator load(Class service) {
+   final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+   try {
+   
Thread.currentThread().setContextClassLoader(pluginClassLoader);
+   return new ContextClassLoaderSettingIterator<>(
+   ServiceLoader.load(service, 
pluginClassLoader).iterator(),
+   pluginClassLoader);
+   } finally {
+   
Thread.currentThread().setContextClassLoader(contextClassLoader);
+   }
+   }
+
+   /**
+* Wrapper for the service iterator. The wrapper will set/unset the 
context classloader to the plugin classloader
+* around the point where elements are returned.
+*
+* @param  type of the iterated plugin element.
+*/
+   static class ContextClassLoaderSettingIterator 
implements Iterator {
+
+   private final Iterator delegate;
+   private final ClassLoader pluginClassLoader;
+
+   ContextClassLoaderSettingIterator(Iterator delegate, 
ClassLoader pluginClassLoader) {
+   this.delegate = delegate;
+   this.pluginClassLoader = pluginClassLoader;
+   }
+
+   @Override
+   public boolean hasNext() {
+   return delegate.hasNext();
+   }
+
+   @Override
+   public P next() {
+   final ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
 
 Review comment:
   I would wrap this setting and resetting logic into a `Closable` class and 
re-use it here and `load` method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apa

[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272927404
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/PluginManagerSingleton.java
 ##
 @@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * Singleton instance provider of {@link PluginManager}. This class must be 
initialized before usage, ideally at the
+ * entry-point of each process that wants to use it.
+ */
+public final class PluginManagerSingleton {
 
 Review comment:
   nit: maybe it's worth merging content of this class with `PluginManager`? If 
you prefer to keep them separated maybe java doc link the 
`PluginManagerSingleton` somewhere in the `PluginManager`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272923859
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginDescriptorsFactory.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to create a collection of {@link PluginDescriptor} based 
on directory structure for a given plugin
+ * root folder.
+ *
+ * The expected structure is as follows: the given plugins root folder, 
containing the plugins folder. One plugin folder
+ * contains all resources (jar files) belonging to a plugin. The name of the 
plugin folder becomes the plugin id.
+ * 
+ * plugins-root-folder/
+ *  |plugin-a/ (folder of plugin a)
+ *  ||-plugin-a-1.jar (the jars containing the 
classes of plugin a)
+ *  ||-plugin-a-2.jar
+ *  ||-...
+ *  |
+ *  |plugin-b/
+ *  ||-plugin-b-1.jar
+ * ...   |-...
+ * 
+ */
+public class DirectoryBasedPluginDescriptorsFactory {
 
 Review comment:
   Hmmm, what about `PluginDiscoverer`? `PluginFinder`? 
`PluginDescriptorsDiscoverer`?
   
   I'm not a fan of `Factory` in the name here, since this class is not only 
responsible for the construction of `PluginDescriptors` but also is 
dicovering/finding them. Also I'm not sure if there is a point of 
exposing/hardcoding `DirectoryBased` in the name.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce Plugin/Loading system and integrate it with FileSystem

2019-04-08 Thread GitBox
pnowojski commented on a change in pull request #8038: [FLINK-11953] Introduce 
Plugin/Loading system and integrate it with FileSystem
URL: https://github.com/apache/flink/pull/8038#discussion_r272921337
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/plugin/DirectoryBasedPluginDescriptorsFactory.java
 ##
 @@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.plugin;
+
+import org.apache.flink.util.function.FunctionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileSystems;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used to create a collection of {@link PluginDescriptor} based 
on directory structure for a given plugin
+ * root folder.
+ *
+ * The expected structure is as follows: the given plugins root folder, 
containing the plugins folder. One plugin folder
+ * contains all resources (jar files) belonging to a plugin. The name of the 
plugin folder becomes the plugin id.
+ * 
+ * plugins-root-folder/
+ *  |plugin-a/ (folder of plugin a)
+ *  ||-plugin-a-1.jar (the jars containing the 
classes of plugin a)
+ *  ||-plugin-a-2.jar
+ *  ||-...
+ *  |
+ *  |plugin-b/
+ *  ||-plugin-b-1.jar
+ * ...   |-...
+ * 
+ */
+public class DirectoryBasedPluginDescriptorsFactory {
+
+   private static final Logger log = 
LoggerFactory.getLogger(DirectoryBasedPluginDescriptorsFactory.class);
+
+   private static final PathMatcher JAR_FILE_MATCHER =
+   FileSystems.getDefault().getPathMatcher("glob:**.jar");
+
+   private final Path pluginsRootDir;
+
+   public DirectoryBasedPluginDescriptorsFactory(Path pluginsRootDir) {
+   this.pluginsRootDir = pluginsRootDir;
+   }
+
+   //TODO this can go into an interface if there would be more strategies 
in the future.
+   public Collection createPluginDescriptors() throws 
IOException {
+   return Files.list(pluginsRootDir)
+   .filter((Path path) -> Files.isDirectory(path))
+   
.map(FunctionUtils.uncheckedFunction(this::createPluginDescriptorForSubDirectory))
+   .filter(Optional::isPresent)
+   .map(Optional::get)
+   .collect(Collectors.toList());
+   }
+
+   private Optional 
createPluginDescriptorForSubDirectory(Path subDirectory) {
+   Optional jarURLsFromDirectory = 
createJarURLsFromDirectory(subDirectory);
+   if (jarURLsFromDirectory.isPresent()) {
+   URL[] urls = jarURLsFromDirectory.get();
+   // we sort the urls for the benefit of having a 
stable/reproducible order of jars.
+   Arrays.sort(urls, Comparator.comparing(URL::toString));
+   //TODO: This class could be extended to parse 
exclude-pattern from a optional text files in the plugin directories.
+   return Optional.of(
+   new PluginDescriptor(
+   subDirectory.getFileName().toString(),
+   urls,
+   new String[0]));
+   } else {
+   return Optional.empty();
+   }
+   }
+
+   private static Optional createJarURLsFromDirectory(Path 
subDirectory) {
+   URL[] urls = null;
+   try {
+   urls = Files.list(subDirectory)
+   .filter((Path p) -> Files.isRegularFile(p) && 
JAR_FILE_MATCHER.matches(p))
+   .map(FunctionUtils.uncheckedFunction((Path p) 
-> p.toUri().toURL()))
+ 

[jira] [Commented] (FLINK-10684) Improve the CSV reading process

2019-04-08 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812221#comment-16812221
 ] 

Fabian Hueske commented on FLINK-10684:
---

Hi, please have a look at https://issues.apache.org/jira/browse/FLINK-7050 and 
the corresponding PR [https://github.com/apache/flink/pull/4660] as well.

IMO, the current CSV InputFormat should be deprecated at some point because it 
is not standard compliant and based on a lot of custom code.

> Improve the CSV reading process
> ---
>
> Key: FLINK-10684
> URL: https://issues.apache.org/jira/browse/FLINK-10684
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataSet
>Reporter: Xingcan Cui
>Priority: Major
>
> CSV is one of the most commonly used file formats in data wrangling. To load 
> records from CSV files, Flink has provided the basic {{CsvInputFormat}}, as 
> well as some variants (e.g., {{RowCsvInputFormat}} and 
> {{PojoCsvInputFormat}}). However, it seems that the reading process can be 
> improved. For example, we could add a built-in util to automatically infer 
> schemas from CSV headers and samples of data. Also, the current bad record 
> handling method can be improved by somehow keeping the invalid lines (and 
> even the reasons for failed parsing), instead of logging the total number 
> only.
> This is an umbrella issue for all the improvements and bug fixes for the CSV 
> reading process.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3

2019-04-08 Thread Till Rohrmann (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812220#comment-16812220
 ] 

Till Rohrmann commented on FLINK-8801:
--

Hi [~Tao Yang] and [~yanyan300300], thanks for reporting this issue. You are 
correct that some of the {{FileSystems}} leave the {{setTimes}} implementation 
to be a no-op. The idea behind this fix was to avoid eventual consistency 
problems. With your proposed change [~yanyan300300] and [~Tao Yang] you might 
run into the problem this issue tried to fix, namely that when reading the 
modification timestamps that the file does not yet exists (depending on the 
file system implementation).

However, the current state does not fully seem to fix the problem. [~NicoK] 
with which file systems did we test this change? Could a workaround be to 
revert this change and add a retry loop if {{FileSystem#getFileStatus}} fails 
with {{FileNotFoundException}}?

> S3's eventual consistent read-after-write may fail yarn deployment of 
> resources to S3
> -
>
> Key: FLINK-8801
> URL: https://issues.apache.org/jira/browse/FLINK-8801
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / YARN, FileSystems, Runtime / Coordination
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.3, 1.5.0
>
>
> According to 
> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:
> {quote}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.
> {quote}
> Some S3 file system implementations may actually execute such a request for 
> the about-to-write object and thus the read-after-write is only eventually 
> consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently 
> relies on a consistent read-after-write since it accesses the remote resource 
> to get file size and modification timestamp. Since there we have access to 
> the local resource, we can use the data from there instead and circumvent the 
> problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] JingsongLi commented on issue #8102: [FLINK-12087][table-runtime-blink] Introduce over window operators to blink batch

2019-04-08 Thread GitBox
JingsongLi commented on issue #8102: [FLINK-12087][table-runtime-blink] 
Introduce over window operators to blink batch
URL: https://github.com/apache/flink/pull/8102#issuecomment-480731189
 
 
   I try implement it like "immediately determine the left and right boundary", 
but even if we immediately determine the boundary, we need take the record one 
by one for calculation, so the final implementation is basically the same as 
the current one.
   You can give me more details about a better understanding implement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-12010) Kinesis Producer problems on Alpine Linux image

2019-04-08 Thread Mario Georgiev (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-12010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mario Georgiev updated FLINK-12010:
---
Description: 
{code:java}
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 sys_siglist: symbol not found
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 backtrace_symbols_fd: symbol not found
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 backtrace: symbol not found
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 __strtok_r
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 __rawmemchr: symbol not found
{code}
When building flink from source and using alpine linux, it appears that 
KinesisProducer does not really like Alpine Linux. Any ideas? 

 

 

 

  was:
{code:java}
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 sys_siglist: symbol not found
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 backtrace_symbols_fd: symbol not found
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 backtrace: symbol not found
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 __strtok_r
Error relocating 
/tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
 __rawmemchr: symbol not found
{code}

When building flink from source and using alpine linux, it appears that 
KinesisProducer does not really like Linux. Any ideas? 

 

 

 


> Kinesis Producer problems on Alpine Linux image
> ---
>
> Key: FLINK-12010
> URL: https://issues.apache.org/jira/browse/FLINK-12010
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kinesis
>Affects Versions: 1.7.2
>Reporter:  Mario Georgiev
>Priority: Major
>
> {code:java}
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  sys_siglist: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  backtrace_symbols_fd: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  backtrace: symbol not found
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  __strtok_r
> Error relocating 
> /tmp/amazon-kinesis-producer-native-binaries/kinesis_producer_489FA9AC71B1CD61A4002E9F16A279556D581D9D:
>  __rawmemchr: symbol not found
> {code}
> When building flink from source and using alpine linux, it appears that 
> KinesisProducer does not really like Alpine Linux. Any ideas? 
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11855) Race condition in EmbeddedLeaderService

2019-04-08 Thread Aljoscha Krettek (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-11855:
-
Fix Version/s: (was: 1.8.1)
   (was: 1.9.0)
   1.8.0

> Race condition in EmbeddedLeaderService
> ---
>
> Key: FLINK-11855
> URL: https://issues.apache.org/jira/browse/FLINK-11855
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Coordination
>Affects Versions: 1.7.2, 1.8.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.3, 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> There is a race condition in the {{EmbeddedLeaderService}} which can occur if 
> the {{EmbeddedLeaderService}} is shut down before the {{GrantLeadershipCall}} 
> has been executed. In this case, the {{contender}} is nulled which leads to a 
> NPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] [flink] zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] Refactor the constructor of NetworkEnvironment

2019-04-08 Thread GitBox
zhijiangW commented on a change in pull request #8090: [FLINK-12067][network] 
Refactor the constructor of NetworkEnvironment
URL: https://github.com/apache/flink/pull/8090#discussion_r272919268
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NetworkEnvironmentConfiguration.java
 ##
 @@ -105,6 +106,302 @@ public NettyConfig nettyConfig() {
return nettyConfig;
}
 
+   public boolean isCreditBased() {
+   return isCreditBased;
+   }
+
+   // 

+
+   /**
+* Utility method to extract network related parameters from the 
configuration and to
+* sanity check them.
+*
+* @param configuration configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size (in bytes)
+* @param localTaskManagerCommunication true, to skip initializing the 
network stack
+* @param taskManagerAddress identifying the IP address under which the 
TaskManager will be accessible
+* @return NetworkEnvironmentConfiguration
+*/
+   @Deprecated
+   public static NetworkEnvironmentConfiguration fromConfiguration(
+   Configuration configuration,
+   long maxJvmHeapMemory,
+   boolean localTaskManagerCommunication,
+   InetAddress taskManagerAddress) {
+
+   // > hosts / ports for communication and data exchange
+
+   final int dataport = 
configuration.getInteger(TaskManagerOptions.DATA_PORT);
+   ConfigurationParserUtils.checkConfigParameter(dataport >= 0, 
dataport, TaskManagerOptions.DATA_PORT.key(),
+   "Leave config parameter empty or use 0 to let the 
system choose a port automatically.");
+
+   final int pageSize = 
ConfigurationParserUtils.getPageSize(configuration);
+
+   final int numNetworkBuffers;
+   if (!hasNewNetworkConfig(configuration)) {
+   // fallback: number of network buffers
+   numNetworkBuffers = 
configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
+
+   checkOldNetworkConfig(numNetworkBuffers);
+   } else {
+   if 
(configuration.contains(TaskManagerOptions.NETWORK_NUM_BUFFERS)) {
+   LOG.info("Ignoring old (but still present) 
network buffer configuration via {}.",
+   
TaskManagerOptions.NETWORK_NUM_BUFFERS.key());
+   }
+
+   final long networkMemorySize = 
calculateNewNetworkBufferMemory(configuration, maxJvmHeapMemory);
+
+   // tolerate offcuts between intended and allocated 
memory due to segmentation (will be available to the user-space memory)
+   long numNetworkBuffersLong = networkMemorySize / 
pageSize;
+   if (numNetworkBuffersLong > Integer.MAX_VALUE) {
+   throw new IllegalArgumentException("The given 
number of memory bytes (" + networkMemorySize
+   + ") corresponds to more than MAX_INT 
pages.");
+   }
+   numNetworkBuffers = (int) numNetworkBuffersLong;
+   }
+
+   final NettyConfig nettyConfig;
+   if (!localTaskManagerCommunication) {
+   final InetSocketAddress taskManagerInetSocketAddress = 
new InetSocketAddress(taskManagerAddress, dataport);
+
+   nettyConfig = new 
NettyConfig(taskManagerInetSocketAddress.getAddress(), 
taskManagerInetSocketAddress.getPort(),
+   pageSize, 
ConfigurationParserUtils.getSlot(configuration), configuration);
+   } else {
+   nettyConfig = null;
+   }
+
+   int initialRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_INITIAL);
+   int maxRequestBackoff = 
configuration.getInteger(TaskManagerOptions.NETWORK_REQUEST_BACKOFF_MAX);
+
+   int buffersPerChannel = 
configuration.getInteger(TaskManagerOptions.NETWORK_BUFFERS_PER_CHANNEL);
+   int extraBuffersPerGate = 
configuration.getInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
+
+   boolean isCreditBased = 
configuration.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL);
 
 Review comment:
   I also do not like the way of two sources of `isCreditBased`. 
   
   I think the key problem is whether `isCreditBased` must exist together with 
`NettyConfig`. My previous thought is trying to make decouple `isCreditBased` 
with `NettyConfig`. 
   
   We know `NettyConfig` is optional currently in 
`NetworkEnvironmentConfiguration`, and most of the existing tests should verify 
the default behavior (`is

[GitHub] [flink] flinkbot edited a comment on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-08 Thread GitBox
flinkbot edited a comment on issue #8078: [FLINK-12066] [State Backends] Remove 
StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078#issuecomment-477977949
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Review Progress
   
   * ✅ 1. The [description] looks good.
   - Approved by @tzulitai [PMC]
   * ✅ 2. There is [consensus] that the contribution should go into to Flink.
   - Approved by @tzulitai [PMC]
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/reviewing-prs.html) for a full explanation of 
the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tzulitai commented on issue #8078: [FLINK-12066] [State Backends] Remove StateSerializerProvider field in keyed state backends

2019-04-08 Thread GitBox
tzulitai commented on issue #8078: [FLINK-12066] [State Backends] Remove 
StateSerializerProvider field in keyed state backends
URL: https://github.com/apache/flink/pull/8078#issuecomment-480712590
 
 
   @flinkbot approve description
   @flinkbot approve consensus


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-8033) Build Flink with JDK 9

2019-04-08 Thread Takanobu Asanuma (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812167#comment-16812167
 ] 

Takanobu Asanuma commented on FLINK-8033:
-

bq. because you still have to solve most of the issues you encounter with java 
9 (and 10) on the way to 11

There was an exception in Hadoop. HADOOP-15905 is a bug of JDK 9/10 which has 
been fixed in JDK 11. It is difficult to handle such a problem since JDK 9/10 
have been EOL. That's why we closed HADOOP-11123(supporting JDK 9) as 
duplicate/superceded of HADOOP-15338(supporting JDK 11).

> Build Flink with JDK 9
> --
>
> Key: FLINK-8033
> URL: https://issues.apache.org/jira/browse/FLINK-8033
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou
>Assignee: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
>
> This is a JIRA to track all issues that found to make Flink compatible with 
> Java 9.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3991) Remove deprecated configuration keys from ConfigConstants

2019-04-08 Thread LiuJi (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16812161#comment-16812161
 ] 

LiuJi commented on FLINK-3991:
--

Hi [~Zentol] It seems now in ConfigConstants, some deprecated keys have no 
usages. Do you think it is a proper time to remove them? If so, I would like to 
work on it.

> Remove deprecated configuration keys from ConfigConstants
> -
>
> Key: FLINK-3991
> URL: https://issues.apache.org/jira/browse/FLINK-3991
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Reporter: Robert Metzger
>Priority: Major
> Fix For: 2.0.0
>
>
> In 
> https://github.com/apache/flink/commit/b0acd97935cd21843bac3b9b5afa3662b52bb95d#diff-40616c4678c3fbfe07c0701505ce0567
>  I deprecated some configuration keys.
> They are unused and need to be removed with the 2.0 release.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


<    1   2   3