[GitHub] lamber-ken commented on issue #7296: [hotfix][web] fix the desc about restart strategy in web

2018-12-14 Thread GitBox
lamber-ken commented on issue #7296: [hotfix][web] fix the desc about restart 
strategy in web
URL: https://github.com/apache/flink/pull/7296#issuecomment-447544430
 
 
   hi,@GJL,cc,thanks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.

2018-12-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-6:
---
Labels: pull-request-available  (was: )

> Overwrite outdated in-progress files in StreamingFileSink.
> --
>
> Key: FLINK-6
> URL: https://issues.apache.org/jira/browse/FLINK-6
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.2
>
>
> In order to guarantee exactly-once semantics, the streaming file sink is 
> implementing a two-phase commit protocol when writing files to the filesystem.
> Initially data is written to in-progress files. These files are then put into 
> "pending" state when they are completed (based on the rolling policy), and 
> they are finally committed when the checkpoint that put them in the "pending" 
> state is acknowledged as complete.
> The above shows that in the case that we have:
> 1) checkpoints A, B, C coming 
> 2) checkpoint A being acknowledged and 
> 3) failure
> Then we may have files that do not belong to any checkpoint (because B and C 
> were not considered successful). These files are currently not cleaned up.
> In order to reduce the amount of such files created, we removed the random 
> suffix from in-progress temporary files, so that the next in-progress file 
> that is opened for this part, overwrites them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] kl0u opened a new pull request #7313: [FLINK-11116][fs-connector] Removed randomness from HDFS and Local fs writers.

2018-12-14 Thread GitBox
kl0u opened a new pull request #7313: [FLINK-6][fs-connector] Removed 
randomness from HDFS and Local fs writers.
URL: https://github.com/apache/flink/pull/7313
 
 
   ## What is the purpose of the change
   
   As described in the related jira, this PR targets the in-progress files left 
after a failure, that belong to no checkpoint. In this case, we lazily 
overwrite them as soon as the new tasks arrive to that part-counter.
   
   ## Brief change log
   
   Just removes the random suffix from the `LocalRecoverableWriter` and the 
`HadoopRecoverableWriter` so that the temporary files are overwritten.
   
   ## Verifying this change
   
   Added 2 tests in the `AbstractRecoverableWriterTest`.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   
   
   R @aljoscha 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-11169) [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui

2018-12-14 Thread Jiayong Mo (JIRA)
Jiayong Mo created FLINK-11169:
--

 Summary:  [hotfix][runtime] fix the problem of not being reloaded 
for jobmanager's log after clicking the refresh button in web ui
 Key: FLINK-11169
 URL: https://issues.apache.org/jira/browse/FLINK-11169
 Project: Flink
  Issue Type: Bug
  Components: REST
Affects Versions: 1.7.0, 1.6.2
Reporter: Jiayong Mo


In the Flink web UI page, we can access the log of jobmanager from the tab 
under its corresponding page, but we could not reload the log (the content is 
not up-to-date) since the log file has been cached for 300s in the local 
client' disk. And this problem occurs because of the incorrect way to check if 
the file should be cached. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11169) fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui

2018-12-14 Thread Jiayong Mo (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiayong Mo updated FLINK-11169:
---
Summary: fix the problem of not being reloaded for jobmanager's log after 
clicking the refresh button in web ui  (was:  [hotfix][runtime] fix the problem 
of not being reloaded for jobmanager's log after clicking the refresh button in 
web ui)

> fix the problem of not being reloaded for jobmanager's log after clicking the 
> refresh button in web ui
> --
>
> Key: FLINK-11169
> URL: https://issues.apache.org/jira/browse/FLINK-11169
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Jiayong Mo
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In the Flink web UI page, we can access the log of jobmanager from the tab 
> under its corresponding page, but we could not reload the log (the content is 
> not up-to-date) since the log file has been cached for 300s in the local 
> client' disk. And this problem occurs because of the incorrect way to check 
> if the file should be cached. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11169) [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log after clicking the refresh button in web ui

2018-12-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11169:
---
Labels: pull-request-available  (was: )

>  [hotfix][runtime] fix the problem of not being reloaded for jobmanager's log 
> after clicking the refresh button in web ui
> -
>
> Key: FLINK-11169
> URL: https://issues.apache.org/jira/browse/FLINK-11169
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.6.2, 1.7.0
>Reporter: Jiayong Mo
>Priority: Major
>  Labels: pull-request-available
>
> In the Flink web UI page, we can access the log of jobmanager from the tab 
> under its corresponding page, but we could not reload the log (the content is 
> not up-to-date) since the log file has been cached for 300s in the local 
> client' disk. And this problem occurs because of the incorrect way to check 
> if the file should be cached. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] AlphaGarden opened a new pull request #7312: [FLINK-11169][runtime] fix the problem of not being reloaded for jobmanager's…

2018-12-14 Thread GitBox
AlphaGarden opened a new pull request #7312:  [FLINK-11169][runtime] fix the 
problem of not being reloaded for jobmanager's…
URL: https://github.com/apache/flink/pull/7312
 
 
   ## What is the purpose of the change
   
   *In the Flink web UI page, we can access the log of jobmanager from the tab 
under its corresponding page, but we could not reload the log (the content is 
not up-to-date) since the log file has been cached for 300s in the local 
client' disk. And this problem occurs because of the incorrect way to check if 
the file should be cached. *
   
   ## Brief change log
 - *Change the way of checking if the current file is a log file or out 
flie.*
   
   ## Verifying this change
   
 - *git clone https://github.com/AlphaGarden/flink.git*
 - *cd flink*
 - *git checkout hotfix-web-reload-jobmanager-log*
 - *mvn clean package -DskipTests*
 - *run the ./start-cluster.sh script, then the log for jobmanager should 
be reloaded from backend evertime we click the refresh button in Flink Web  UI*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (yes)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-9717) Flush state of one side of the join if other side is bounded

2018-12-14 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-9717:
--

Assignee: (was: boshu Zheng)

> Flush state of one side of the join if other side is bounded
> 
>
> Key: FLINK-9717
> URL: https://issues.apache.org/jira/browse/FLINK-9717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins 
> (both normal and versioned joins) could flush the state from other side.
> This highly useful optimisation that would speed up versioned joins and would 
> allow normal joins of large unbounded streams with bounded tables (for 
> example some static data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded

2018-12-14 Thread boshu Zheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722013#comment-16722013
 ] 

boshu Zheng commented on FLINK-9717:


Just a little thought on it to share. When considering about joins with bounded 
data, I would prefer the side input solution which there is no `MAX_WATERMARK` 
if I understand correctly.

> Flush state of one side of the join if other side is bounded
> 
>
> Key: FLINK-9717
> URL: https://issues.apache.org/jira/browse/FLINK-9717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: boshu Zheng
>Priority: Major
>
> Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins 
> (both normal and versioned joins) could flush the state from other side.
> This highly useful optimisation that would speed up versioned joins and would 
> allow normal joins of large unbounded streams with bounded tables (for 
> example some static data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded

2018-12-14 Thread boshu Zheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16722001#comment-16722001
 ] 

boshu Zheng commented on FLINK-9717:


Hi [~pnowojski], thanks for your comment, I haven't dug into this issue yet, 
feel free to take it over :)

> Flush state of one side of the join if other side is bounded
> 
>
> Key: FLINK-9717
> URL: https://issues.apache.org/jira/browse/FLINK-9717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: boshu Zheng
>Priority: Major
>
> Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins 
> (both normal and versioned joins) could flush the state from other side.
> This highly useful optimisation that would speed up versioned joins and would 
> allow normal joins of large unbounded streams with bounded tables (for 
> example some static data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-11166) Slot Placement Constraint

2018-12-14 Thread Tony Xintong Song (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11166?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tony Xintong Song reassigned FLINK-11166:
-

Assignee: Tony Xintong Song

> Slot Placement Constraint
> -
>
> Key: FLINK-11166
> URL: https://issues.apache.org/jira/browse/FLINK-11166
> Project: Flink
>  Issue Type: New Feature
>  Components: ResourceManager
>Reporter: Tony Xintong Song
>Assignee: Tony Xintong Song
>Priority: Major
>
> In many cases, users may want Flink to schedule their job tasks following 
> certain locality preferences. E.g., colocating upstream/downstream tasks to 
> reduce data transmission costs, dispersing tasks of certain pattern (e.g., 
> I/O intensive) to avoid resource competitions, running tasks in exclusive 
> TaskExecutor-s for task level resource consumption measurements, etc.
> Currently, there are two ways in Flink to specify such locality preferences: 
> specifying preferred locations in the slot request, or setting slot sharing 
> group for the task. In both ways the preferences are specified when 
> requesting slots from the SlotPool and can affect how tasks are placed among 
> the slots allocated to the JobMaster.
> However, there is no guarantee that such preferences can always be satisfied, 
> especially when slots are customized with different resource profiles for 
> different kinds of tasks. E.g., in cases where two tasks A and B are 
> preferred to be scheduled onto a same TaskExecutor, it is possible that none 
> of the slots customized for A offered to the JobMaster are collocated with 
> slots customized for B.
> To better support locality preferences with various slot resource 
> specifications, it is necessary to allow JobMaster-s to request slots 
> subjected to certain placement constraints from the ResourceManager.
> In addition, most underlying frameworks Flink runs on (Yarn, Kubernetes, 
> Mesos) already have individual supports for container level placement 
> constraints. It is a great opportunity for Flink to leverage such underlying 
> supports and enable scheduling tasks with rich locality preferences.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10590) Optional quantifier is discarded from first element of group pattern

2018-12-14 Thread bupt_ljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721978#comment-16721978
 ] 

bupt_ljy commented on FLINK-10590:
--

It seems that we need to support the group patterns, then we're able to have 
the problem

> Optional quantifier is discarded from first element of group pattern
> 
>
> Key: FLINK-10590
> URL: https://issues.apache.org/jira/browse/FLINK-10590
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> In a pattern {{(A? B)+}}, the optional property of {{A}} state is effectively 
> discarded in the compiled state machine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode

2018-12-14 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721857#comment-16721857
 ] 

Rong Rong commented on FLINK-11088:
---

I further dig into the details on the document on Hadoop side and seems like 
there are 3 recommended way of distributing credentials to secure long running 
service on YARN. See here: 
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services.

I am not sure whether this applies to other cluster resource management system, 
but I think it is worthwhile to take a look. For one: the current way of 
letting all JM and TMs renews keytab with KDC seems to be a problem. If we can 
have AM or JM renewing with keytab credential and distribute them via 
delegation token to all TMs it will release lots of loads on KDC server.

I will start drafting a simple discussion doc if the community thinks this is 
worth to dig deeper. Any thoughts [~till.rohrmann] [~aljoscha] ?

> Improve Kerberos Authentication using Keytab in YARN proxy user mode
> 
>
> Key: FLINK-11088
> URL: https://issues.apache.org/jira/browse/FLINK-11088
> Project: Flink
>  Issue Type: Improvement
>  Components: Security, YARN
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently flink-yarn assumes keytab is shipped as application master 
> environment local resource on client side and will be distributed to all the 
> TMs. This does not work for YARN proxy user mode [1] since proxy user or 
> super user might not have access to actual users' keytab, but can request 
> delegation tokens on users' behalf. 
> Based on the type of security options for long-living YARN service[2], we 
> propose to have the keytab file path discovery configurable depending on the 
> launch mode of the YARN client. 
> Reference: 
> [1] 
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
> [2] 
> https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-11088) Improve Kerberos Authentication using Keytab in YARN proxy user mode

2018-12-14 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721857#comment-16721857
 ] 

Rong Rong edited comment on FLINK-11088 at 12/14/18 11:03 PM:
--

I further dig into the details on the document on Hadoop side and seems like 
there are 3 recommended way of distributing credentials to secure long running 
service on YARN. See here: 
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services.

I am not sure whether this applies to other cluster resource management system, 
but I think it is worthwhile to take a look. For one: the current way of 
letting all JM and TMs renews keytab with KDC seems to be a problem. If we can 
have AM or JM renewing with keytab credential and distribute them via 
delegation token to all TMs it will release lots of loads on KDC server.

I will start drafting a simple discussion doc if the community thinks this is 
worth to dig deeper. Any thoughts [~suez1224], [~till.rohrmann], [~aljoscha] ?


was (Author: walterddr):
I further dig into the details on the document on Hadoop side and seems like 
there are 3 recommended way of distributing credentials to secure long running 
service on YARN. See here: 
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services.

I am not sure whether this applies to other cluster resource management system, 
but I think it is worthwhile to take a look. For one: the current way of 
letting all JM and TMs renews keytab with KDC seems to be a problem. If we can 
have AM or JM renewing with keytab credential and distribute them via 
delegation token to all TMs it will release lots of loads on KDC server.

I will start drafting a simple discussion doc if the community thinks this is 
worth to dig deeper. Any thoughts [~till.rohrmann] [~aljoscha] ?

> Improve Kerberos Authentication using Keytab in YARN proxy user mode
> 
>
> Key: FLINK-11088
> URL: https://issues.apache.org/jira/browse/FLINK-11088
> Project: Flink
>  Issue Type: Improvement
>  Components: Security, YARN
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Currently flink-yarn assumes keytab is shipped as application master 
> environment local resource on client side and will be distributed to all the 
> TMs. This does not work for YARN proxy user mode [1] since proxy user or 
> super user might not have access to actual users' keytab, but can request 
> delegation tokens on users' behalf. 
> Based on the type of security options for long-living YARN service[2], we 
> propose to have the keytab file path discovery configurable depending on the 
> launch mode of the YARN client. 
> Reference: 
> [1] 
> https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
> [2] 
> https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html#Securing_Long-lived_YARN_Services



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] 
Reconcile powermock with JDK 9
URL: https://github.com/apache/flink/pull/7302#discussion_r241909198
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
 ##
 @@ -89,46 +77,72 @@ public void testCheckpointRecovery() throws Exception {
expectedCheckpointIds.add(1L);
expectedCheckpointIds.add(2L);
 
-   final RetrievableStateHandle 
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-   
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
-
-   final RetrievableStateHandle 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-   when(retrievableStateHandle1.retrieveState()).then(
 
 Review comment:
   Thanks for your review @zentol. I would separate the commits but wonder if 
it requires separated pull requests?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
zentol commented on a change in pull request #7302: [FLINK-11156] [tests] 
Reconcile powermock with JDK 9
URL: https://github.com/apache/flink/pull/7302#discussion_r241907414
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -128,7 +145,9 @@ public ZooKeeperCompletedCheckpointStore(
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-   this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage);
+   this.checkpointsInZooKeeper = (checkpointsInZooKeeper != null) ?
 
 Review comment:
   it's a bit icky but non-intrusive enough to be acceptable imo,


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
zentol commented on a change in pull request #7302: [FLINK-11156] [tests] 
Reconcile powermock with JDK 9
URL: https://github.com/apache/flink/pull/7302#discussion_r241906430
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 ##
 @@ -327,7 +327,6 @@ public int exists(String pathInZooKeeper) throws Exception 
{
 * @return True if the state handle could be released
 * @throws Exception If the ZooKeeper operation or discarding the state 
handle fails
 */
-   @Nullable
 
 Review comment:
   unrelated, move into a separate hotfix commit


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
zentol commented on a change in pull request #7302: [FLINK-11156] [tests] 
Reconcile powermock with JDK 9
URL: https://github.com/apache/flink/pull/7302#discussion_r241907196
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
 ##
 @@ -89,46 +77,72 @@ public void testCheckpointRecovery() throws Exception {
expectedCheckpointIds.add(1L);
expectedCheckpointIds.add(2L);
 
-   final RetrievableStateHandle 
failingRetrievableStateHandle = mock(RetrievableStateHandle.class);
-   
when(failingRetrievableStateHandle.retrieveState()).thenThrow(new 
IOException("Test exception"));
-
-   final RetrievableStateHandle 
retrievableStateHandle1 = mock(RetrievableStateHandle.class);
-   when(retrievableStateHandle1.retrieveState()).then(
 
 Review comment:
   are all these refactorings necessary for the test to run on jdk 9? If not, 
please move them into a separate commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
TisonKun commented on a change in pull request #7302: [FLINK-11156] [tests] 
Reconcile powermock with JDK 9
URL: https://github.com/apache/flink/pull/7302#discussion_r241907057
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 ##
 @@ -128,7 +145,9 @@ public ZooKeeperCompletedCheckpointStore(
// All operations will have the path as root
this.client = client.usingNamespace(client.getNamespace() + 
checkpointsPath);
 
-   this.checkpointsInZooKeeper = new 
ZooKeeperStateHandleStore<>(this.client, stateStorage);
+   this.checkpointsInZooKeeper = (checkpointsInZooKeeper != null) ?
 
 Review comment:
   I am not super happy with this line. This is why I have given up this way 
the first time. I'd like to learn whether this style is OK in Flink or we tend 
to build such a testing constructor in another way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock 
with JDK 9
URL: https://github.com/apache/flink/pull/7302#issuecomment-447494914
 
 
   I've updated the pull request, and the final usage of mockito is to mock 
CuratorFramework. I'd like to listen to @tillrohrmann to learn if we can test 
with Curator without mockito. I am rethinking ZooKeeper based HaService and 
learn how to test would be quite helpful.
   
   Thanks for your review @GJL !


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
TisonKun commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock 
with JDK 9
URL: https://github.com/apache/flink/pull/7302#issuecomment-447485963
 
 
   @GJL I can do it with a testing constructor. If it is the style that Flink 
would be happy with, I will send a commit for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11134) Invalid REST API request should not log the full exception in Flink logs

2018-12-14 Thread Matt Dailey (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721778#comment-16721778
 ] 

Matt Dailey commented on FLINK-11134:
-

This also affects Flink 1.5.5.  I got a similar stack trace, also while leaving 
the Flink Web UI open after deleting the job:

{code}
org.apache.flink.runtime.rest.NotFoundException: Job 
faef2e5bc048947fc1bf6d468175935d not found
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture.uniExceptionallyStage(CompletableFuture.java:884)
at 
java.util.concurrent.CompletableFuture.exceptionally(CompletableFuture.java:2196)
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:86)
at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
at 
org.apache.flink.runtime.rest.AbstractHandler.respondAsLeader(AbstractHandler.java:168)
at 
org.apache.flink.runtime.rest.handler.RedirectHandler.lambda$null$0(RedirectHandler.java:139)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could 
not find Flink job (faef2e5bc048947fc1bf6d468175935d)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGatewayFuture(Dispatcher.java:693)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.requestJob(Dispatcher.java:459)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
{code}

> Invalid REST API request should not log the full exception in Flink logs
> 
>
> Key: FLINK-11134
> URL: https://issues.apache.org/jira/browse/FLINK-11134
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.0
>Reporter: Mark Cho
>Priority: Minor
>
> {code:java}
> 2018-12-11 17:52:19,207 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception 
> occurred in REST handler.
> org.apache.flink.runtime.rest.NotFoundException: Job 
> 15d06690e88d309aa1bdbb6ce7c6dcd1 not found
> at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.lambda$handleRequest$1(AbstractExecutionGraphHandler.java:90)
> at 
> 

[GitHub] StefanRRichter commented on issue #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
StefanRRichter commented on issue #7288: [FLINK-9702] Improvement in 
(de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#issuecomment-447403243
 
 
   @azagrebin Thanks for the review. I addressed most comments as suggested. 
Will merge once travis is green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] StefanRRichter commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
StefanRRichter commented on a change in pull request #7288: [FLINK-9702] 
Improvement in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241839041
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 ##
 @@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param  type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder {
+
+   /** The serializer for the key. */
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   /** The output to write the key into. */
+   @Nonnull
+   private final DataOutputSerializer keyOutView;
+
+   /** The number of Key-group-prefix bytes for the key. */
+   @Nonnegative
+   private final int keyGroupPrefixBytes;
+
+   /** This flag indicates whether the key type has a variable byte size 
in serialization. */
+   private final boolean keySerializerTypeVariableSized;
+
+   /** Mark for the position after the serialized key. */
+   @Nonnegative
+   private int afterKeyMark;
+
+   public RocksDBSerializedCompositeKeyBuilder(
 
 Review comment:
   We don't strictly need it, but I think this can be useful for tools like 
Bravo. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tweise commented on issue #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-14 Thread GitBox
tweise commented on issue #7249: [FLINK-11048] Ability to programmatically 
execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#issuecomment-447403037
 
 
   @mxm I think the constructor proliferation suffers from its own issues: 
besides repetition you need to decide how to prioritize parameters that 
naturally have no priority. And again anything with a long parameter list is 
not only hard to read but also toxic for backward compatible code evolution.
   
   Ideally we would just have one parameter to execute, which could be of type 
`ExecutionParameters` and hold the job name as well as the savepoint info. 
Anything else that might be needed in the future can be added without breaking 
the interface contract. But that isn't easy to accomplish due to how the code 
has been cast. The difficulty comes from the protected executeRemotely method 
that we cannot change. How about passing the savepoint parameter or before 
mentioned new parameters holder through a thread local instead of the instance 
variable? The difference how we pass it internally is cosmetic and not 
important to the user. The question of savepoint vs. generalized execution 
parameters seems more interesting. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-14 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r241825068
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -283,6 +275,14 @@ public JobExecutionResult execute(String jobName) throws 
ProgramInvocationExcept
return executeRemotely(streamGraph, jarFiles);
}
 
+   /**
+* Execute the job with savepoint restore.
+*/
+   public JobExecutionResult execute(String jobName, 
SavepointRestoreSettings savepointRestoreSettings) throws 
ProgramInvocationException {
+   this.savepointRestoreSettings = savepointRestoreSettings;
+   return execute(jobName);
 
 Review comment:
   I understand that it is one-time, but I would still prefer to not have this 
side effect here. Could you either 1) make the SavepointSettings a `final` 
field and initialize it in the constructor or 2) provide the settings here as a 
parameter which is not stored in a field? Sorry for the hassle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] mxm commented on a change in pull request #7249: [FLINK-11048] Ability to programmatically execute streaming pipeline with savepoint restore

2018-12-14 Thread GitBox
mxm commented on a change in pull request #7249: [FLINK-11048] Ability to 
programmatically execute streaming pipeline with savepoint restore
URL: https://github.com/apache/flink/pull/7249#discussion_r241822916
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
 ##
 @@ -65,6 +67,8 @@
/** The classpaths that need to be attached to each job. */
private final List globalClasspaths;
 
+   private SavepointRestoreSettings savepointRestoreSettings;
 
 Review comment:
   Could you document this field?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241816667
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
 ##
 @@ -38,7 +41,25 @@ class EnumerableToLogicalTableScan(
 val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan]
 val table = oldRel.getTable
 val newRel = LogicalTableScan.create(oldRel.getCluster, table)
-call.transformTo(newRel)
+
+val streamTable = table.unwrap(classOf[UpsertStreamTable[_]])
+val isUpsertTable = streamTable match {
+  case _: UpsertStreamTable[_] =>
+true
+  case _ =>
+false
+}
+
+if (isUpsertTable) {
 
 Review comment:
   you could merge this if check with match above to simplify code a bit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241811123
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/DataStreamConversions.scala
 ##
 @@ -59,5 +59,38 @@ class DataStreamConversions[T](dataStream: DataStream[T], 
inputType: TypeInforma
 }
   }
 
+  /**
+* Converts the [[DataStream]] with upsert messages into a [[Table]] with 
keys.
+*
+* The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag, the second
+* field holds the record. A true [[Boolean]] flag indicates an update 
message, a false flag
+* indicates a delete message.
+*
+* The field name and key of the new [[Table]] can be specified like this:
+*
+* {{{
+*   val env = StreamExecutionEnvironment.getExecutionEnvironment
+*   val tEnv = TableEnvironment.getTableEnvironment(env)
+*
+*   val stream: DataStream[(Boolean, (String, Int))] = ...
+*   val table = stream.toKeyedTable(tEnv, 'name.key, 'amount)
+* }}}
+*
+* If field names are not explicitly specified, names are automatically 
extracted from the type
+* of the [[DataStream]].
+* If keys are not explicitly specified, an empty key will be used and the 
table will be a
+* single row table.
+*
+* @param tableEnv The [[StreamTableEnvironment]] in which the new 
[[Table]] is created.
+* @param fields The field names of the new [[Table]] (optional).
+* @return The resulting [[Table]].
+*/
+  def toKeyedTable(tableEnv: StreamTableEnvironment, fields: Expression*): 
Table = {
 
 Review comment:
   ```
   Methods in DataStreamConversions:
   
   toTableFromAppendStream
   toTableFromUpsertStream
   toTableFromRetractStream
   ```
   
   I think this makes sense. @twalthr do you think as well?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241820886
 
 

 ##
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/ExplainTest.scala
 ##
 @@ -65,21 +64,6 @@ class ExplainTest extends AbstractTestBase {
 assertEquals(expect, result)
   }
 
-  @Test
 
 Review comment:
   Why have you removed this test? Was it moved/superseded by something?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241817741
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/EnumerableToLogicalTableScan.scala
 ##
 @@ -22,13 +22,16 @@ import 
org.apache.calcite.adapter.enumerable.EnumerableTableScan
 import org.apache.calcite.plan.RelOptRule.{any, operand}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
 import org.apache.calcite.rel.logical.LogicalTableScan
+import org.apache.flink.table.plan.logical.rel.LogicalUpsertToRetraction
+import org.apache.flink.table.plan.schema.UpsertStreamTable
 
 /**
  * Rule that converts an EnumerableTableScan into a LogicalTableScan.
  * We need this rule because Calcite creates an EnumerableTableScan
  * when parsing a SQL query. We convert it into a LogicalTableScan
  * so we can merge the optimization process with any plan that might be created
- * by the Table API.
+ * by the Table API. The rule also checks whether the source is an upsert 
source and adds
 
 Review comment:
   This rule is only applied on the `EnumerableTableScan` nodes. This comment 
suggests, that `EnumerableTableScan` appears only in sql, so something is wrong 
here. If we want to keep it as it is now, the comment need updating.
   
   (please check also my comment below in `LogicalToEnumerableTableScan`)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241820550
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalToEnumerableTableScan.scala
 ##
 @@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.logical
+
+import org.apache.calcite.adapter.enumerable.EnumerableTableScan
+import org.apache.calcite.plan.RelOptRule.{any, operand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.rel.logical.LogicalTableScan
+
+/**
+ * Rule that converts a LogicalTableScan into an EnumerableTableScan. We need 
this rule to
 
 Review comment:
   What's the problem? That we can not force calcite to fire a rule only once? 
Like I would expect to have a single pass rule, that's fired only once (around 
expand plan optimisation phase), that goes through the plan, checks if the 
sources is upsert and if so, inserts `LogicalUpsertToRetraction`. 
   
   Alternatively, we can think about doing this one step earlier. Like in Table 
API somebody is creating a `LogicalTableScan`, right? We could inject logic 
that checks if we need `LogicalUpsertToRetraction` or not there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241807984
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ##
 @@ -573,6 +573,17 @@ abstract class StreamTableEnvironment(
 registerTableInternal(name, dataStreamTable)
   }
 
+  def getTypeFromUpsertStream[T](dataStream: DataStream[T]): 
TypeInformation[T] = {
 
 Review comment:
   private?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241809269
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
 ##
 @@ -224,14 +226,14 @@ case class ProctimeAttribute(expr: Expression) extends 
TimeAttribute(expr) {
   override def toString: String = s"proctime($child)"
 }
 
-case class Key(child: Expression) extends UnaryExpression {
-  override private[flink] def resultType: TypeInformation[_] = child.resultType
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-throw new UnsupportedOperationException(
-  s"Key Expression can only be used during table initialization.")
-  }
-}
+//case class Key(child: Expression) extends UnaryExpression {
 
 Review comment:
   Some left overs to remove?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6787: [FLINK-8577][table] Implement proctime DataStream to Table upsert conversion

2018-12-14 Thread GitBox
pnowojski commented on a change in pull request #6787: [FLINK-8577][table] 
Implement proctime DataStream to Table upsert conversion
URL: https://github.com/apache/flink/pull/6787#discussion_r241815618
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/RemoveDataStreamUpsertToRetractionRule.scala
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.RelOptRule.{none, operand}
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.flink.table.plan.nodes.datastream.{AccMode, 
DataStreamUpsertToRetraction}
+
+/**
+  * Rule to remove [[DataStreamUpsertToRetraction]] under [[AccMode]]. In this 
case, it is a no-op
 
 Review comment:
   `under [[AccMode.AccRetract]].`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zzchun commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation

2018-12-14 Thread GitBox
zzchun commented on a change in pull request #7291: [hotfix] [docs] Display 
Connectors and Formats tables, and fix typo in documentation
URL: https://github.com/apache/flink/pull/7291#discussion_r241819796
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -58,11 +56,7 @@ The following table list all available connectors and 
formats. Their mutual comp
 | JSON  | `flink-json` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar)
 |
 | Apache Avro   | `flink-avro` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar)
 |
 
-{% else %}
-
-This table is only available for stable releases.
-
-{% endif %}
 
 Review comment:
   @twalthr I have recovered the changes, can you help me to have a review, 
thank you!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] GJL commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with JDK 9

2018-12-14 Thread GitBox
GJL commented on issue #7302: [FLINK-11156] [tests] Reconcile powermock with 
JDK 9
URL: https://github.com/apache/flink/pull/7302#issuecomment-447380751
 
 
   Thank you for your contribution to Apache Flink @TisonKun 
   I think an additional constructor annotated with `@VisibleForTesting` is 
more maintainable. The 
   current solution has the following downsides:
   - It is not obvious why `Comparator.comparing` does not work. The next 
person might try to refactor it back to the old solution and break java 9 
compatibility.
   - The test uses powermock which might break again in later java versions.
   
   Let me loop in @tillrohrmann, the original author of the test. What do you 
think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #6777: [FLINK-10461] [State Backends, Checkpointing] Speed up download files when restore from DFS

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #6777: [FLINK-10461] [State 
Backends, Checkpointing] Speed up download files when restore from DFS
URL: https://github.com/apache/flink/pull/6777#discussion_r241815671
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDbStateDataTransfer.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static 
org.apache.flink.runtime.concurrent.Executors.directExecutorService;
+
+/**
+ * Data transfer utils for RocksDbKeyedStateBackend.
+ */
+class RocksDbStateDataTransfer {
+
+   static void transferAllStateDataToDirectory(
+   IncrementalKeyedStateHandle restoreStateHandle,
+   Path dest,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws Exception {
+
+   final Map sstFiles =
+   restoreStateHandle.getSharedState();
+   final Map miscFiles =
+   restoreStateHandle.getPrivateState();
+
+   downloadDataForAllStateHandles(sstFiles, dest, 
restoringThreadNum, closeableRegistry);
+   downloadDataForAllStateHandles(miscFiles, dest, 
restoringThreadNum, closeableRegistry);
+   }
+
+   /**
+* Copies all the files from the given stream state handles to the 
given path, renaming the files w.r.t. their
+* {@link StateHandleID}.
+*/
+   private static void downloadDataForAllStateHandles(
+   Map stateHandleMap,
+   Path restoreInstancePath,
+   int restoringThreadNum,
+   CloseableRegistry closeableRegistry) throws 
InterruptedException, IOException {
+
+   final ExecutorService executorService = 
createExecutorService(restoringThreadNum);
+
+   try {
+   List runnables = 
createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry);
+   List> futures = new 
ArrayList<>(runnables.size());
+   for (Runnable runnable : runnables) {
+   
futures.add(CompletableFuture.runAsync(runnable, executorService));
+   }
+
+   FutureUtils.waitForAll(futures).get();
+   } catch (ExecutionException e) {
+   final Throwable throwable = 
ExceptionUtils.stripExecutionException(e);
+   if (throwable instanceof IOException) {
+   throw (IOException) throwable;
 
 Review comment:
   I do not think we need to wrap non-IOException with IOException. I would 
leave it just as `throw ExceptionUtils.stripExecutionException(e)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r241812506
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 ##
 @@ -726,6 +728,56 @@ void sendPartitionInfos() {
}
}
 
+   /**
+* Check whether the InputDependencyConstraint is satisfied for this 
vertex.
+*
+* @return whether the input constraint is satisfied
+*/
+   public boolean checkInputDependencyConstraints() {
 
 Review comment:
   Ok, I agree to leave it as it is. As we expect it to be always true in 
`scheduleOrUpdateConsumers`, I would also agree not to have the shortcut. If we 
want to have  the shortcut, I would leave then a comment explaining why we have 
it, like you mentioned above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10928) Job unable to stabilise after restart

2018-12-14 Thread Daniel Harper (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Harper updated FLINK-10928:
--
Attachment: Screen Shot 2018-12-10 at 14.13.52.png

> Job unable to stabilise after restart 
> --
>
> Key: FLINK-10928
> URL: https://issues.apache.org/jira/browse/FLINK-10928
> Project: Flink
>  Issue Type: Bug
> Environment: AWS EMR 5.17.0
> FLINK 1.5.2
> BEAM 2.7.0
>Reporter: Daniel Harper
>Priority: Major
> Attachments: Screen Shot 2018-11-16 at 15.49.03.png, Screen Shot 
> 2018-11-16 at 15.49.15.png, Screen Shot 2018-12-10 at 14.13.52.png, 
> ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf
>
>
> We've seen a few instances of this occurring in production now (it's 
> difficult to reproduce) 
> I've attached a timeline of events as a PDF here  
> [^ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf]  but essentially 
> it boils down to
> 1. Job restarts due to exception
> 2. Job restores from a checkpoint but we get the exception
> {code}
> Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: 
> Timeout waiting for connection from pool
> {code}
> 3. Job restarts
> 4. Job restores from a checkpoint but we get the same exception
>  repeat a few times within 2-3 minutes
> 5. YARN kills containers with out of memory
> {code}
> 2018-11-14 00:16:04,430 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Closing TaskExecutor connection 
> container_1541433014652_0001_01_000716 because: Container 
> [pid=7725,containerID=container_1541433014652_0001_01_
> 000716] is running beyond physical memory limits. Current usage: 6.4 GB of 
> 6.4 GB physical memory used; 8.4 GB of 31.9 GB virtual memory used. Killing 
> container.
> Dump of the process-tree for container_1541433014652_0001_01_000716 :
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> |- 7725 7723 7725 7725 (bash) 0 0 115863552 696 /bin/bash -c 
> /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m 
> -XX:MaxDirectMemorySize=1533m 
> -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log 
> -XX:GCLogF
> ileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause 
> -XX:+PrintGCDateStamps -XX:+UseG1GC 
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652_00
> 01/container_1541433014652_0001_01_000716/taskmanager.log 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /var/log/hadoop-yarn/containers/application_1541433014652_0001/container
> _1541433014652_0001_01_000716/taskmanager.out 2> 
> /var/log/hadoop-yarn/containers/application_1541433014652_0001/container_1541433014652_0001_01_000716/taskmanager.err
> |- 7738 7725 7725 7725 (java) 6959576 976377 8904458240 1671684 
> /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m 
> -XX:MaxDirectMemorySize=1533m 
> -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log 
> -XX:GCL
> ogFileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause 
> -XX:+PrintGCDateStamps -XX:+UseG1GC 
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652
> _0001/container_1541433014652_0001_01_000716/taskmanager.log 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir .
>  
> Container killed on request. Exit code is 143
> Container exited with a non-zero exit code 143
> {code}
> 6. YARN allocates new containers but the job is never able to get back into a 
> stable state, with constant restarts until eventually the job is cancelled 
> We've seen something similar to FLINK-10848 happening to with some task 
> managers allocated but sitting 'idle' state. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10928) Job unable to stabilise after restart

2018-12-14 Thread Daniel Harper (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721569#comment-16721569
 ] 

Daniel Harper commented on FLINK-10928:
---

Hi [~dawidwys] 

Going to be tricky to provide a heap dump due to sensitive data unfortunately.

We've resolved the connection timeouts issue by increasing the connection pool 
size from 15 to 30, which after running this for 7 days or so has resulted in 0 
'timeout waiting for connection from pool' errors when the job restarts and 
restores from a checkpoint

One of the causes of the job restarting in the first place is FLINK-10844, 
which causes the checkpoint to fail (note this is intermittent we see this once 
or twice a day) which causes the job to restart. 

We are looking at enabling the setting {{failOnCheckpointingErrors}} to false 
to mitigate this in the meantime, although we understand the risk in enabling 
this setting.

This 'death spiral'/instability has happened 3 or so times in the past 6 weeks, 
and we see the job restarting once or twice a day in the times between these 
massive failures. The only thing I can think of is a memory leak building over 
time and eventually triggering YARN to kill the containers.

I did a heap dump on one of the taskmanagers this morning and it looks to me 
like there are multiple copies of 'user' classes i.e. BEAM code and our code, 
most of which have 0 instances, which looks like a classloader leak to me? This 
snapshot was taken after the job had restarted about 10 times. 

 !Screen Shot 2018-12-10 at 14.13.52.png! 


> Job unable to stabilise after restart 
> --
>
> Key: FLINK-10928
> URL: https://issues.apache.org/jira/browse/FLINK-10928
> Project: Flink
>  Issue Type: Bug
> Environment: AWS EMR 5.17.0
> FLINK 1.5.2
> BEAM 2.7.0
>Reporter: Daniel Harper
>Priority: Major
> Attachments: Screen Shot 2018-11-16 at 15.49.03.png, Screen Shot 
> 2018-11-16 at 15.49.15.png, 
> ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf
>
>
> We've seen a few instances of this occurring in production now (it's 
> difficult to reproduce) 
> I've attached a timeline of events as a PDF here  
> [^ants-CopyofThe'death'spiralincident-191118-1231-1332.pdf]  but essentially 
> it boils down to
> 1. Job restarts due to exception
> 2. Job restores from a checkpoint but we get the exception
> {code}
> Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: 
> Timeout waiting for connection from pool
> {code}
> 3. Job restarts
> 4. Job restores from a checkpoint but we get the same exception
>  repeat a few times within 2-3 minutes
> 5. YARN kills containers with out of memory
> {code}
> 2018-11-14 00:16:04,430 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Closing TaskExecutor connection 
> container_1541433014652_0001_01_000716 because: Container 
> [pid=7725,containerID=container_1541433014652_0001_01_
> 000716] is running beyond physical memory limits. Current usage: 6.4 GB of 
> 6.4 GB physical memory used; 8.4 GB of 31.9 GB virtual memory used. Killing 
> container.
> Dump of the process-tree for container_1541433014652_0001_01_000716 :
> |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> |- 7725 7723 7725 7725 (bash) 0 0 115863552 696 /bin/bash -c 
> /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m 
> -XX:MaxDirectMemorySize=1533m 
> -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log 
> -XX:GCLogF
> ileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause 
> -XX:+PrintGCDateStamps -XX:+UseG1GC 
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652_00
> 01/container_1541433014652_0001_01_000716/taskmanager.log 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /var/log/hadoop-yarn/containers/application_1541433014652_0001/container
> _1541433014652_0001_01_000716/taskmanager.out 2> 
> /var/log/hadoop-yarn/containers/application_1541433014652_0001/container_1541433014652_0001_01_000716/taskmanager.err
> |- 7738 7725 7725 7725 (java) 6959576 976377 8904458240 1671684 
> /usr/lib/jvm/java-openjdk/bin/java -Xms4995m -Xmx4995m 
> -XX:MaxDirectMemorySize=1533m 
> -Xloggc:/var/log/hadoop-yarn/flink_gc_container_1541433014652_0001_%p.log 
> -XX:GCL
> ogFileSize=200M -XX:NumberOfGCLogFiles=10 -XX:+PrintGCDetails 
> -XX:+PrintGCTimeStamps -XX:+PrintTenuringDistribution -XX:+PrintGCCause 
> -XX:+PrintGCDateStamps -XX:+UseG1GC 
> -Dlog.file=/var/log/hadoop-yarn/containers/application_1541433014652
> _0001/container_1541433014652_0001_01_000716/taskmanager.log 
> 

[jira] [Closed] (FLINK-9555) Support table api in scala shell

2018-12-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9555.
---
Resolution: Fixed

Reverted for 1.7.1 in: d00dd3323d53cba323bde5ebfe390ffdb4d777d4

> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11168) LargePlanTest times out on Travis

2018-12-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-11168:
-
Affects Version/s: (was: 1.7.0)
   1.7.1

> LargePlanTest times out on Travis
> -
>
> Key: FLINK-11168
> URL: https://issues.apache.org/jira/browse/FLINK-11168
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.1, 1.8.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The newly added {{LargePlanTest}} has timed out on Travis: 
> https://travis-ci.org/apache/flink/jobs/467949929
> {code}
> 13:58:43.593 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time 
> elapsed: 15.201 s <<< FAILURE! - in 
> org.apache.flink.test.planning.LargePlanTest
> 13:58:43.599 [ERROR] 
> testPlanningOfLargePlan(org.apache.flink.test.planning.LargePlanTest)  Time 
> elapsed: 15.028 s  <<< ERROR!
> org.junit.runners.model.TestTimedOutException: test timed out after 15000 
> milliseconds
>   at 
> org.apache.flink.test.planning.LargePlanTest.runProgram(LargePlanTest.java:51)
>   at 
> org.apache.flink.test.planning.LargePlanTest.testPlanningOfLargePlan(LargePlanTest.java:39)
> {code}
> FYI [~mxm]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9555) Support table api in scala shell

2018-12-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-9555:
-

Will be reverted for 1.7.1 since it introduced bugs.

> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9555) Support table api in scala shell

2018-12-14 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-9555:

Fix Version/s: (was: 1.7.1)

> Support table api in scala shell
> 
>
> Key: FLINK-9555
> URL: https://issues.apache.org/jira/browse/FLINK-9555
> Project: Flink
>  Issue Type: New Feature
>  Components: Scala Shell
>Affects Versions: 1.5.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> It would be nice to have table api available in scala shell so that user can 
> experience table api in interactive way. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded

2018-12-14 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721561#comment-16721561
 ] 

Piotr Nowojski commented on FLINK-9717:
---

[~kisimple], this ticket is not as easy as you might think (neither as simple 
as I have though when I was creating it) - this optimisation is relatively 
trivial to implement if you do not consider restoring from 
checkpoints/savepoints. Currently problem is that Flink doesn't keep & restore 
the last previous watermark after restoring from checkpoint and this is hard to 
workaround. 

In other words, now we can easily "flush" one side of the join when we receive 
{{MAX_WATERMARK}}, but what should happen after restoring from checkpoint? 
There is no easy way to store the information that {{MAX_WATERMARK}} was 
previously reached. As far as I have thought about this, it can not be stored 
on the state of the Join operator and even if it could be done this way, it's 
probably not the proper/elegant solution. Probably the correct solution is to 
store {{MAX_WATERMARK}} in the state around watermark emitter/source operator 
and the last previously emitted watermark should be re-emitted when the job is 
restored.

> Flush state of one side of the join if other side is bounded
> 
>
> Key: FLINK-9717
> URL: https://issues.apache.org/jira/browse/FLINK-9717
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: boshu Zheng
>Priority: Major
>
> Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins 
> (both normal and versioned joins) could flush the state from other side.
> This highly useful optimisation that would speed up versioned joins and would 
> allow normal joins of large unbounded streams with bounded tables (for 
> example some static data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-11168) LargePlanTest times out on Travis

2018-12-14 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-11168:


 Summary: LargePlanTest times out on Travis
 Key: FLINK-11168
 URL: https://issues.apache.org/jira/browse/FLINK-11168
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.7.0, 1.8.0
Reporter: Chesnay Schepler


The newly added {{LargePlanTest}} has timed out on Travis: 
https://travis-ci.org/apache/flink/jobs/467949929

{code}
13:58:43.593 [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 15.201 s <<< FAILURE! - in org.apache.flink.test.planning.LargePlanTest
13:58:43.599 [ERROR] 
testPlanningOfLargePlan(org.apache.flink.test.planning.LargePlanTest)  Time 
elapsed: 15.028 s  <<< ERROR!
org.junit.runners.model.TestTimedOutException: test timed out after 15000 
milliseconds
at 
org.apache.flink.test.planning.LargePlanTest.runProgram(LargePlanTest.java:51)
at 
org.apache.flink.test.planning.LargePlanTest.testPlanningOfLargePlan(LargePlanTest.java:39)
{code}

FYI [~mxm]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #7289: [FLINK-11001][table] Fix window rowtime attribute can't be renamed bug in Java

2018-12-14 Thread GitBox
twalthr commented on issue #7289: [FLINK-11001][table] Fix window rowtime 
attribute can't be renamed bug in Java
URL: https://github.com/apache/flink/pull/7289#issuecomment-447366595
 
 
   @hequn8128 have you checked if the docs need to be updated somewhere?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation

2018-12-14 Thread GitBox
twalthr commented on a change in pull request #7291: [hotfix] [docs] Display 
Connectors and Formats tables, and fix typo in documentation
URL: https://github.com/apache/flink/pull/7291#discussion_r241799867
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -58,11 +56,7 @@ The following table list all available connectors and 
formats. Their mutual comp
 | JSON  | `flink-json` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar)
 |
 | Apache Avro   | `flink-avro` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar)
 |
 
-{% else %}
-
-This table is only available for stable releases.
-
-{% endif %}
 
 Review comment:
   You are using docs of an unstable release. Why should this be confusing? 
Please undo these changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7311: [hotfix][docs] Correct the field name in Connect to External Systems doc

2018-12-14 Thread GitBox
asfgit closed pull request #7311: [hotfix][docs] Correct the field name in 
Connect to External Systems doc
URL: https://github.com/apache/flink/pull/7311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 1cc97a39377..6d7db117c97 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -154,7 +154,7 @@ tableEnvironment
 new Schema()
   .field("rowtime", Types.SQL_TIMESTAMP)
 .rowtime(new Rowtime()
-  .timestampsFromField("ts")
+  .timestampsFromField("timestamp")
   .watermarksPeriodicBounded(6)
 )
   .field("user", Types.LONG)
@@ -1166,7 +1166,7 @@ ClusterBuilder builder = ... // configure Cassandra 
cluster connection
 CassandraAppendTableSink sink = new CassandraAppendTableSink(
   builder,
   // the query must match the schema of the table
-  INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?));
+  "INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)");
 
 tableEnv.registerTableSink(
   "cassandraOutputTable",


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r241794342
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexInputConstraintTest.java
 ##
 @@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.InputDependencyConstraint;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.isInExecutionState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitForAllExecutionsPredicate;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilExecutionVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the inputs constraint for {@link ExecutionVertex}.
+ */
+public class ExecutionVertexInputConstraintTest extends TestLogger {
+
+   @Test
+   public void testInputConsumable() throws Exception {
+   JobVertex v1 = new JobVertex("vertex1");
+   JobVertex v2 = new JobVertex("vertex2");
+   JobVertex v3 = new JobVertex("vertex3");
+   v1.setParallelism(2);
+   v2.setParallelism(2);
+   v3.setParallelism(2);
+   v1.setInvokableClass(AbstractInvokable.class);
+   v2.setInvokableClass(AbstractInvokable.class);
+   v3.setInvokableClass(AbstractInvokable.class);
+   v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, 
ResultPartitionType.PIPELINED);
+   v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, 
ResultPartitionType.BLOCKING);
+   List ordered = Arrays.asList(v1, v2, v3);
+   ExecutionGraph eg = createExecutionGraph(ordered);
+
+   ExecutionVertex ev11 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[0];
+   ExecutionVertex ev12 = 
eg.getJobVertex(v1.getID()).getTaskVertices()[1];
+   ExecutionVertex ev21 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[0];
+   ExecutionVertex ev22 = 
eg.getJobVertex(v2.getID()).getTaskVertices()[1];
+   ExecutionVertex ev31 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[0];
+   ExecutionVertex ev32 = 
eg.getJobVertex(v3.getID()).getTaskVertices()[1];
+
+   eg.scheduleForExecution();
 
 Review comment:
   Yes, I meant `@Before void init()` method could recreate new graph before 
each test run.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] asfgit closed pull request #7307: [hotfix][docs] Imporve the correctness in Detecting Patterns doc

2018-12-14 Thread GitBox
asfgit closed pull request #7307: [hotfix][docs] Imporve the correctness in 
Detecting Patterns doc
URL: https://github.com/apache/flink/pull/7307
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/streaming/match_recognize.md 
b/docs/dev/table/streaming/match_recognize.md
index 01799cc47b4..e38fb5c1129 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -107,7 +107,7 @@ The table has a following schema:
 
 {% highlight text %}
 Ticker
- |-- symbol: Long # symbol of the stock
+ |-- symbol: String   # symbol of the stock
  |-- price: Long  # price of the stock
  |-- tax: Long# tax liability of the stock
  |-- rowtime: TimeIndicatorTypeInfo(rowtime)  # point in time when the 
change to those values happened
@@ -293,6 +293,7 @@ The same query where `B*` is modified to `B*?`, which means 
that `B*` should be
  symbol   lastPrice
  ===
  XYZ  13
+ XYZ  16
 {% endhighlight %}
 
 The pattern variable `B` matches only to the row with price `12` instead of 
swallowing the rows with prices `12`, `13`, and `14`.
@@ -842,7 +843,7 @@ The last result matched against the rows #5, #6.
 This combination will produce a runtime exception because one would always try 
to start a new match where the
 last one started. This would produce an infinite loop and, thus, is prohibited.
 
-One has to keep in mind that in case of the `SKIP TO FIRST/LAST 
variable`strategy it might be possible that there are no rows mapped to that
+One has to keep in mind that in case of the `SKIP TO FIRST/LAST variable` 
strategy it might be possible that there are no rows mapped to that
 variable (e.g. for pattern `A*`). In such cases, a runtime exception will be 
thrown as the standard requires a valid row to continue the
 matching.
 


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-12-14 Thread GitBox
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r241787811
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,65 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Aggregations
+
+Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom 
[user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported.
+
+Aggregate functions are applied to each subset of rows mapped to a match. In 
order to understand how those subsets are evaluated have a look at the [event 
stream navigation](#pattern-navigation) section.
+
+The task of the following example is to find the longest period of time for 
which the average price of a ticker did not go below certain threshold. It 
shows how expressible `MATCH_RECOGNIZE` can become with aggregations.
+This task can be performed with the following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+MATCH_RECOGNIZE (
+PARTITION BY symbol
+ORDER BY rowtime
+MEASURES
+FIRST(A.rowtime) AS start_tstamp,
+LAST(A.rowtime) AS end_tstamp,
+AVG(A.price) AS avgPrice
+ONE ROW PER MATCH
+AFTER MATCH SKIP TO FIRST B
+PATTERN (A+ B)
+DEFINE
+A AS AVG(A.price) < 15
+) MR;
 
 Review comment:
   @dawidwys what is `MR` actually doing? Do we have to call `SELECT 
MR.start_tstamp`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.

2018-12-14 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-6:
---
Description: 
In order to guarantee exactly-once semantics, the streaming file sink is 
implementing a two-phase commit protocol when writing files to the filesystem.

Initially data is written to in-progress files. These files are then put into 
"pending" state when they are completed (based on the rolling policy), and they 
are finally committed when the checkpoint that put them in the "pending" state 
is acknowledged as complete.

The above shows that in the case that we have:
1) checkpoints A, B, C coming 
2) checkpoint A being acknowledged and 
3) failure

Then we may have files that do not belong to any checkpoint (because B and C 
were not considered successful). These files are currently not cleaned up.

In order to reduce the amount of such files created, we removed the random 
suffix from in-progress temporary files, so that the next in-progress file that 
is opened for this part, overwrites them.

  was:
In order to guarantee exactly-once semantics, the streaming file sink is 
implementing a two-phase commit protocol when writing files to the filesystem.

Initially data is written to in-progress files. These files are then put into 
"pending" state when they are completed (based on the rolling policy), and they 
are finally committed when the checkpoint that put them in the "pending" state 
is acknowledged as complete.

The above shows that in the case that we have:
1) checkpoints A, B, C coming 
2) checkpoint A being acknowledged and 
3) failure

Then we may have files that do not belong to any checkpoint (because B and C 
were not considered successful). These files are currently not cleaned up.

This issue aims at cleaning up these files.


> Overwrite outdated in-progress files in StreamingFileSink.
> --
>
> Key: FLINK-6
> URL: https://issues.apache.org/jira/browse/FLINK-6
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.7.2
>
>
> In order to guarantee exactly-once semantics, the streaming file sink is 
> implementing a two-phase commit protocol when writing files to the filesystem.
> Initially data is written to in-progress files. These files are then put into 
> "pending" state when they are completed (based on the rolling policy), and 
> they are finally committed when the checkpoint that put them in the "pending" 
> state is acknowledged as complete.
> The above shows that in the case that we have:
> 1) checkpoints A, B, C coming 
> 2) checkpoint A being acknowledged and 
> 3) failure
> Then we may have files that do not belong to any checkpoint (because B and C 
> were not considered successful). These files are currently not cleaned up.
> In order to reduce the amount of such files created, we removed the random 
> suffix from in-progress temporary files, so that the next in-progress file 
> that is opened for this part, overwrites them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11116) Overwrite outdated in-progress files in StreamingFileSink.

2018-12-14 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-6?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas updated FLINK-6:
---
Summary: Overwrite outdated in-progress files in StreamingFileSink.  (was: 
Clean-up temporary files that upon recovery, they belong to no checkpoint.)

> Overwrite outdated in-progress files in StreamingFileSink.
> --
>
> Key: FLINK-6
> URL: https://issues.apache.org/jira/browse/FLINK-6
> Project: Flink
>  Issue Type: Improvement
>  Components: filesystem-connector
>Affects Versions: 1.7.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.7.2
>
>
> In order to guarantee exactly-once semantics, the streaming file sink is 
> implementing a two-phase commit protocol when writing files to the filesystem.
> Initially data is written to in-progress files. These files are then put into 
> "pending" state when they are completed (based on the rolling policy), and 
> they are finally committed when the checkpoint that put them in the "pending" 
> state is acknowledged as complete.
> The above shows that in the case that we have:
> 1) checkpoints A, B, C coming 
> 2) checkpoint A being acknowledged and 
> 3) failure
> Then we may have files that do not belong to any checkpoint (because B and C 
> were not considered successful). These files are currently not cleaned up.
> This issue aims at cleaning up these files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11167) Optimize RocksDBList#put for no empty input

2018-12-14 Thread Congxian Qiu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu updated FLINK-11167:
-
Component/s: State Backends, Checkpointing

> Optimize RocksDBList#put for no empty input
> ---
>
> Key: FLINK-11167
> URL: https://issues.apache.org/jira/browse/FLINK-11167
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>
> In `RocksDBListState.putInternal` we will first remove the current state and 
> then add the new list if needed(list is no empty) such as below.
> I think if the list is not empty, we could {color:#FF}skip the remove 
> operation{color}.
>  
> {code:java}
> public void updateInternal(List values) {
>Preconditions.checkNotNull(values, "List of values to add cannot be 
> null.");
>clear();
>if (!values.isEmpty()) {
>   try {
>  writeCurrentKeyWithGroupAndNamespace();
>  byte[] key = dataOutputView.getCopyOfBuffer();
>  byte[] premerge = getPreMergedValue(values, elementSerializer, 
> dataOutputView);
>  backend.db.put(columnFamily, writeOptions, key, premerge);
>   } catch (IOException | RocksDBException e) {
>  throw new FlinkRuntimeException("Error while updating data to 
> RocksDB", e);
>   }
>}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] KarmaGYZ opened a new pull request #7311: [hotfix][docs] Correct the field name in Connect to External Systems doc

2018-12-14 Thread GitBox
KarmaGYZ opened a new pull request #7311: [hotfix][docs] Correct the field name 
in Connect to External Systems doc
URL: https://github.com/apache/flink/pull/7311
 
 
   ## What is the purpose of the change
   
   This pr correct the field name in Connect to External Systems doc and add 
missing quotation marks of SQL command.
   
   ## Brief change log
   
   -  correct the field name in Connect to External Systems doc
   -  add missing quotation marks of SQL command
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10595) Support patterns that can produce empty matches

2018-12-14 Thread bupt_ljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721431#comment-16721431
 ] 

bupt_ljy commented on FLINK-10595:
--

[~dawidwys]
I'm very curious about this empty match, could you tell me the background or 
the use cases about it?

> Support patterns that can produce empty matches
> ---
>
> Key: FLINK-10595
> URL: https://issues.apache.org/jira/browse/FLINK-10595
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Priority: Major
>
> CEP library does not emit empty matches. In order to be SQL standard 
> compliant we explicitly forbid patterns that can produce empty 
> matches(patterns that have only optional states) e.g A*



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-12-14 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels updated FLINK-10566:
---
Affects Version/s: 1.8.0

> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4, 1.6.1, 1.7.0, 1.8.0
>Reporter: Robert Bradshaw
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>
> Attachments: chart.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet input = env.fromElements("a", "b", "c");
> DataSet stats = null;
> for (int i = 0; i < depth; i++) {
>   stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet analyze(DataSet input, 
> DataSet stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
>   final int ii = i;
>   if (stats != null) {
> input = input.map(new RichMapFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
>   Collection broadcastSet = 
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
>   return value;
> }
>   }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
>   }
>   DataSet branch = input
>.map(s -> new Tuple2(0, s + 
> ii))
>.groupBy(0)
>.minBy(1)
>.map(kv -> kv.f1);
>   if (stats == null) {
> stats = branch;
>   } else {
> stats = stats.union(branch);
>   }
> }
> return stats.map(s -> "(" + s + ").stats");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10566) Flink Planning is exponential in the number of stages

2018-12-14 Thread Maximilian Michels (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maximilian Michels resolved FLINK-10566.

   Resolution: Fixed
Fix Version/s: 1.8.0
   1.7.1
   1.6.3
   1.5.6

Backported the fix to the 1.5, 1.6 and the 1.7 release branches.

> Flink Planning is exponential in the number of stages
> -
>
> Key: FLINK-10566
> URL: https://issues.apache.org/jira/browse/FLINK-10566
> Project: Flink
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 1.5.4, 1.6.1, 1.7.0
>Reporter: Robert Bradshaw
>Assignee: Maximilian Michels
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.6, 1.6.3, 1.7.1, 1.8.0
>
> Attachments: chart.png
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> This makes it nearly impossible to run graphs with 100 or more stages. (The 
> execution itself is still sub-second, but the job submission takes 
> increasingly long.)
> I can reproduce this with the following pipeline, which resembles my 
> real-world workloads (with depth up to 10 and width up, and past, 50). On 
> Flink it seems getting width beyond width 10 is problematic (times out after 
> hours). Note the log scale on the chart for time. 
>  
> {code:java}
>   public static void runPipeline(int depth, int width) throws Exception {
> final ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
> DataSet input = env.fromElements("a", "b", "c");
> DataSet stats = null;
> for (int i = 0; i < depth; i++) {
>   stats = analyze(input, stats, width / (i + 1) + 1);
> }
> stats.writeAsText("out.txt");
> env.execute("depth " + depth + " width " + width);
>   }
>   public static DataSet analyze(DataSet input, 
> DataSet stats, int branches) {
> System.out.println("analyze " + branches);
> for (int i = 0; i < branches; i++) {
>   final int ii = i;
>   if (stats != null) {
> input = input.map(new RichMapFunction() {
> @Override
> public void open(Configuration parameters) throws Exception {
>   Collection broadcastSet = 
> getRuntimeContext().getBroadcastVariable("stats");
> }
> @Override
> public String map(String value) throws Exception {
>   return value;
> }
>   }).withBroadcastSet(stats.map(s -> "(" + s + ").map"), "stats");
>   }
>   DataSet branch = input
>.map(s -> new Tuple2(0, s + 
> ii))
>.groupBy(0)
>.minBy(1)
>.map(kv -> kv.f1);
>   if (stats == null) {
> stats = branch;
>   } else {
> stats = stats.union(branch);
>   }
> }
> return stats.map(s -> "(" + s + ").stats");
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] mxm closed pull request #7276: [FLINK-10566] Fix exponential planning time of large programs

2018-12-14 Thread GitBox
mxm closed pull request #7276: [FLINK-10566] Fix exponential planning time of 
large programs
URL: https://github.com/apache/flink/pull/7276
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java 
b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index efbc4fac039..32eed69e2ce 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -23,6 +23,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -361,10 +362,14 @@ public int getMaximumParallelism() {

private static final class MaxDopVisitor implements 
Visitor> {
 
+   private final Set visitedOperators = new HashSet<>();
private int maxDop = -1;
-   
+
@Override
public boolean preVisit(Operator visitable) {
+   if (!visitedOperators.add(visitable)) {
+   return false;
+   }
this.maxDop = Math.max(this.maxDop, 
visitable.getParallelism());
return true;
}
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 22a2a93f066..beb1b65c4a5 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -73,6 +73,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -963,12 +964,16 @@ public Plan createProgramPlan(String jobName, boolean 
clearSinks) {
if (!config.isAutoTypeRegistrationDisabled()) {
plan.accept(new 
Visitor>() {
 
-   private final HashSet> deduplicator = 
new HashSet<>();
+   private final Set> registeredTypes = 
new HashSet<>();
+   private final 
Set> visitedOperators = new 
HashSet<>();
 
@Override
public boolean 
preVisit(org.apache.flink.api.common.operators.Operator visitable) {
+   if (!visitedOperators.add(visitable)) {
+   return false;
+   }
OperatorInformation opInfo = 
visitable.getOperatorInfo();
-   
Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, 
deduplicator);
+   
Serializers.recursivelyRegisterType(opInfo.getOutputType(), config, 
registeredTypes);
return true;
}
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java 
b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
new file mode 100644
index 000..6c30af88ad4
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.planning;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import 

[GitHub] mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of large programs

2018-12-14 Thread GitBox
mxm commented on issue #7276: [FLINK-10566] Fix exponential planning time of 
large programs
URL: https://github.com/apache/flink/pull/7276#issuecomment-447323623
 
 
   All green this time. Merging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10594) Support SUBSETS

2018-12-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10594:
---
Labels: pull-request-available  (was: )

> Support SUBSETS
> ---
>
> Key: FLINK-10594
> URL: https://issues.apache.org/jira/browse/FLINK-10594
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, Table API  SQL
>Reporter: Dawid Wysakowicz
>Assignee: bupt_ljy
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] buptljy opened a new pull request #7310: [FLINK-10594][table] Add Subsets for CEP

2018-12-14 Thread GitBox
buptljy opened a new pull request #7310: [FLINK-10594][table] Add Subsets for 
CEP
URL: https://github.com/apache/flink/pull/7310
 
 
   ## What is the purpose of the change
   
   - Add subsets for CEP
   
   ## Brief change log
   
   - Add subsets as the parameter of the MatchCodeGenerator
   - Merge subset's specified list results
   
   
   ## Verifying this change
   
   - Unit testing(EventTime/ProcessingTime/Multi Subsets)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`:no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive):no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not documented
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-12-14 Thread GitBox
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r241741665
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
 ##
 @@ -168,27 +170,23 @@ object AggregateUtil {
   generateRetraction: Boolean,
   consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = {
 
-val (aggFields, aggregates, isDistinctAggs, accTypes, accSpecs) =
-  transformToAggregateFunctions(
+val aggregateMetadata = extractAggregateMetadata(
 namedAggregates.map(_.getKey),
 inputRowType,
 consumeRetraction,
 tableConfig,
 isStateBackedDataViews = true)
 
-val aggMapping = aggregates.indices.map(_ + groupings.length).toArray
-
-val outputArity = groupings.length + aggregates.length
-
-val aggregationStateType: RowTypeInfo = new RowTypeInfo(accTypes: _*)
+val aggMapping = 
getAdjustedMapping(aggregateMetadata.getAggregateCallsCount, groupings.length)
 
 Review comment:
   Actually I thought that this function can be part of `AggregateMetadata`? 
Because you always apply it on the calls count.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #7177: [FLINK-7599] [table] Support for aggregates in MATCH_RECOGNIZE

2018-12-14 Thread GitBox
twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r241742837
 
 

 ##
 File path: docs/dev/table/streaming/match_recognize.md
 ##
 @@ -211,6 +211,65 @@ If a condition is not defined for a pattern variable, a 
default condition will b
 
 For a more detailed explanation about expressions that can be used in those 
clauses, please have a look at the [event stream 
navigation](#pattern-navigation) section.
 
+### Aggregations
+
+Aggregations can be used in `DEFINE` and `MEASURES` clauses. Both 
[built-in]({{ site.baseurl }}/dev/table/sql.html#built-in-functions) and custom 
[user defined]({{ site.baseurl }}/dev/table/udfs.html) functions are supported.
+
+Aggregate functions are applied to each subset of rows mapped to a match. In 
order to understand how those subsets are evaluated have a look at the [event 
stream navigation](#pattern-navigation) section.
+
+The task of the following example is to find the longest period of time for 
which the average price of a ticker did not go below certain threshold. It 
shows how expressible `MATCH_RECOGNIZE` can become with aggregations.
+This task can be performed with the following query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+MATCH_RECOGNIZE (
+PARTITION BY symbol
+ORDER BY rowtime
+MEASURES
+FIRST(A.rowtime) AS start_tstamp,
+LAST(A.rowtime) AS end_tstamp,
+AVG(A.price) AS avgPrice
+ONE ROW PER MATCH
+AFTER MATCH SKIP TO FIRST B
+PATTERN (A+ B)
+DEFINE
+A AS AVG(A.price) < 15
+) MR;
+{% endhighlight %}
+
+Given this query and following input values:
+
+{% highlight text %}
+symbol rowtime pricetax
+==    === ===
+'ACME'  '01-Apr-11 10:00:00'   12  1
+'ACME'  '01-Apr-11 10:00:01'   17  2
+'ACME'  '01-Apr-11 10:00:02'   13  1
+'ACME'  '01-Apr-11 10:00:03'   16  3
+'ACME'  '01-Apr-11 10:00:04'   25  2
+'ACME'  '01-Apr-11 10:00:05'   2   1
+'ACME'  '01-Apr-11 10:00:06'   4   1
+'ACME'  '01-Apr-11 10:00:07'   10  2
+'ACME'  '01-Apr-11 10:00:08'   15  2
+'ACME'  '01-Apr-11 10:00:09'   25  2
+'ACME'  '01-Apr-11 10:00:10'   30  1
+{% endhighlight %}
+
+The query will accumulate events as part of the pattern variable `A` as long 
as the average price of them does not exceed `15`. For example, such a limit 
exceeding happens at `01-Apr-11 10:00:04`.
+The following period exceeds the average price of `15` again at `01-Apr-11 
10:00:10`. Thus the results for said query will be:
+
+{% highlight text %}
+ symbol   start_tstamp   end_tstamp  avgPrice
+=  ==  ==  
+ACME   01-APR-11 10:00:00  01-APR-11 10:00:03 14.5
+ACME   01-APR-11 10:00:04  01-APR-11 10:00:09 13.5
+{% endhighlight %}
+
+Note Aggregations can be applied to 
expressions, but only if they reference a single pattern variable. Thus 
`SUM(A.price * A.tax)` is a valid one, but `AVG(A.price * B.tax)` is not.
+
+Attention `DISTINCT` aggregations are 
not supported.
 
 Review comment:
   Can you open a follow up issue for this if it does not exist yet?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11161) Unable to import java packages in scala-shell

2018-12-14 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721350#comment-16721350
 ] 

Timo Walther commented on FLINK-11161:
--

CC [~sunjincheng121]

> Unable to import java packages in scala-shell
> -
>
> Key: FLINK-11161
> URL: https://issues.apache.org/jira/browse/FLINK-11161
> Project: Flink
>  Issue Type: Improvement
>  Components: Scala Shell
>Affects Versions: 1.8.0
>Reporter: Jeff Zhang
>Assignee: shuiqiangchen
>Priority: Minor
>  Labels: pull-request-available
> Attachments: 09_33_31__12_14_2018.jpg
>
>  Time Spent: 0.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-14 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721348#comment-16721348
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11046:
-

I propose the following fix:

The {{RequestIndexer}} instance provided to the {{ActionRequestFailureHandler}} 
shouldn't be the same instance as the one used for indexing incoming records 
(i.e. the one used in {{invoke}} method of the sink).

Instead, it should be a separate instance, which buffers any requests that the 
user attempts to re-index in the failure handler.
In {{invoke}}, before processing the next element, we always check if there are 
buffered requests from the failure handler that needs to be added to the actual 
request indexer.

[~luoguohao] you mentioned that you would want to try fixing this. Do you want 
to take a try on this?

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  

[jira] [Commented] (FLINK-11067) Port TableEnvironments to Java

2018-12-14 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721330#comment-16721330
 ] 

Timo Walther commented on FLINK-11067:
--

Sorry, [~dian.fu] and [~sunjincheng121] I forgot that {{BatchTableEnvironment}} 
and {{StreamTableEnvironment}} are also in different packages. Yes, I think 
[~dawidwys] suggestion is a better solution with a common base class in 
{{flink-table-api-base}}. In summary:

// in flink-tabe-api-base
interface BaseTableEnvironment {

// methods independent of batch/streaming or Java/Scala

// contains main logic for within the table ecosystem
// (reading from table sources and writing to table sinks)
}

// in flink-table-api-java package
interface TableEnvironment extends BaseTableEnvironment{

// only methods for creating a table environment in Java!

static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment 
env);

static StreamTableEnvironment 
getTableEnvironment(StreamExecutionEnvironment env);

// and methods specific for Java i.e. UDF registration
}
interface BatchTableEnvironment extends TableEnvironment {
// methods specific for batch and Java (toDataSet, fromDataSet)
}
interface StreamTableEnvironment extends TableEnvironment {
// methods specific for streaming and Java (toRetractStream, ...)
}

// in flink-table-api-scala package
interface TableEnvironment extends BaseTableEnvironment {

// only methods for creating a table environment in Scala!

static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment 
env);

static StreamTableEnvironment 
getTableEnvironment(StreamExecutionEnvironment env);

// and methods specific for Scala i.e. UDF registration
}
interface BatchTableEnvironment extends TableEnvironment {
// methods specific for batch and Scala (toDataSet, fromDataSet)
}
interface StreamTableEnvironment extends TableEnvironment {
// methods specific for streaming and Scala (toRetractStream, ...)
}

What do you think?

> Port TableEnvironments to Java
> --
>
> Key: FLINK-11067
> URL: https://issues.apache.org/jira/browse/FLINK-11067
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, 
> {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided 
> and discussed. Some refactoring and clean up might be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] wangyang0918 commented on issue #7144: [FLINK-10932] [ResourceManager] Initial flink-kubernetes module with empty implementation

2018-12-14 Thread GitBox
wangyang0918 commented on issue #7144: [FLINK-10932] [ResourceManager] Initial 
flink-kubernetes module with empty implementation
URL: https://github.com/apache/flink/pull/7144#issuecomment-447307319
 
 
   @isunjin It’s great to start the kubernetes integration with flink. I am 
glad to get involved in and could help to give some suggestions based on our 
use case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-11067) Port TableEnvironments to Java

2018-12-14 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721330#comment-16721330
 ] 

Timo Walther edited comment on FLINK-11067 at 12/14/18 12:07 PM:
-

Sorry, [~dian.fu] and [~sunjincheng121] I forgot that {{BatchTableEnvironment}} 
and {{StreamTableEnvironment}} are also in different packages. Yes, I think 
[~dawidwys] suggestion is a better solution with a common base class in 
{{flink-table-api-base}}. In summary:

{code}
// in flink-tabe-api-base
interface BaseTableEnvironment {

// methods independent of batch/streaming or Java/Scala

// contains main logic for within the table ecosystem
// (reading from table sources and writing to table sinks)
}

// in flink-table-api-java package
interface TableEnvironment extends BaseTableEnvironment{

// only methods for creating a table environment in Java!

static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment 
env);

static StreamTableEnvironment 
getTableEnvironment(StreamExecutionEnvironment env);

// and methods specific for Java i.e. UDF registration
}
interface BatchTableEnvironment extends TableEnvironment {
// methods specific for batch and Java (toDataSet, fromDataSet)
}
interface StreamTableEnvironment extends TableEnvironment {
// methods specific for streaming and Java (toRetractStream, ...)
}

// in flink-table-api-scala package
interface TableEnvironment extends BaseTableEnvironment {

// only methods for creating a table environment in Scala!

static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment 
env);

static StreamTableEnvironment 
getTableEnvironment(StreamExecutionEnvironment env);

// and methods specific for Scala i.e. UDF registration
}
interface BatchTableEnvironment extends TableEnvironment {
// methods specific for batch and Scala (toDataSet, fromDataSet)
}
interface StreamTableEnvironment extends TableEnvironment {
// methods specific for streaming and Scala (toRetractStream, ...)
}
{code}

What do you think?


was (Author: twalthr):
Sorry, [~dian.fu] and [~sunjincheng121] I forgot that {{BatchTableEnvironment}} 
and {{StreamTableEnvironment}} are also in different packages. Yes, I think 
[~dawidwys] suggestion is a better solution with a common base class in 
{{flink-table-api-base}}. In summary:

// in flink-tabe-api-base
interface BaseTableEnvironment {

// methods independent of batch/streaming or Java/Scala

// contains main logic for within the table ecosystem
// (reading from table sources and writing to table sinks)
}

// in flink-table-api-java package
interface TableEnvironment extends BaseTableEnvironment{

// only methods for creating a table environment in Java!

static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment 
env);

static StreamTableEnvironment 
getTableEnvironment(StreamExecutionEnvironment env);

// and methods specific for Java i.e. UDF registration
}
interface BatchTableEnvironment extends TableEnvironment {
// methods specific for batch and Java (toDataSet, fromDataSet)
}
interface StreamTableEnvironment extends TableEnvironment {
// methods specific for streaming and Java (toRetractStream, ...)
}

// in flink-table-api-scala package
interface TableEnvironment extends BaseTableEnvironment {

// only methods for creating a table environment in Scala!

static BatchTableEnvironment getTableEnvironment(ExecutionEnvironment 
env);

static StreamTableEnvironment 
getTableEnvironment(StreamExecutionEnvironment env);

// and methods specific for Scala i.e. UDF registration
}
interface BatchTableEnvironment extends TableEnvironment {
// methods specific for batch and Scala (toDataSet, fromDataSet)
}
interface StreamTableEnvironment extends TableEnvironment {
// methods specific for streaming and Scala (toRetractStream, ...)
}

What do you think?

> Port TableEnvironments to Java
> --
>
> Key: FLINK-11067
> URL: https://issues.apache.org/jira/browse/FLINK-11067
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> This task includes porting {{TableEnvironment}}, {{StreamTableEnvironment}}, 
> {{BatchTableEnvironment}} to Java. API-breaking changes need to be avoided 
> and discussed. Some refactoring and clean up might be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol closed pull request #7309: Release 1.7

2018-12-14 Thread GitBox
zentol closed pull request #7309: Release 1.7
URL: https://github.com/apache/flink/pull/7309
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] klion26 edited a comment on issue #7309: Release 1.7

2018-12-14 Thread GitBox
klion26 edited a comment on issue #7309: Release 1.7
URL: https://github.com/apache/flink/pull/7309#issuecomment-447302388
 
 
   Hi, @westkiller, the pr wants to merge 232 commits from release-1.7 to 
master, maybe we could close this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-14 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721314#comment-16721314
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11046:
-

Ah , I think I understand the deadlock now.

It's because the we're calling the user provided 
{{ActionRequestFailureHandler.onFailure(...)}}  inside the {{afterBulk}} 
callback. The lock is only released when the {{afterBulk}} method returns.

So, the deadlock is:
1. The {{BulkProcessor}} flushes, and one of the document indexing failed, 
which invokes the user's {{ActionRequestFailureHandler.onFailure(...)}}. At 
this point, the lock on {{BulkProcessor}} isn't released yet, because the 
{{onFailure}} call is part of the bulk processor's flush callback.
2. Within {{ActionRequestFailureHandler.onFailure(...)}}, in your case you 
added some new documents to be indexed. Upon adding, the {{BulkProcessor}} 
would try to flush again, but the lock wasn't released yet and therefore 
deadlock.

So, the re-indexing thread (i.e. the async callback) should have been blocked 
on:
[https://github.com/elastic/elasticsearch/blob/v6.3.1/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java#L60]

While the main task thread should have been blocked on:
[https://github.com/elastic/elasticsearch/blob/v6.3.1/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java#L86]

 

Could you confirm this and see if the analysis makes sense to you?
[~luoguohao]

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}

[jira] [Closed] (FLINK-11040) Incorrect generic type of DataStream in broadcast_state.md

2018-12-14 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz closed FLINK-11040.

   Resolution: Fixed
Fix Version/s: 1.8.0

Fixed via: bc4194ab756e86c5883b272e97f6825d7d5e9ba2

> Incorrect generic type of DataStream in broadcast_state.md
> --
>
> Key: FLINK-11040
> URL: https://issues.apache.org/jira/browse/FLINK-11040
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.7.0
>Reporter: Yangze Guo
>Assignee: Yangze Guo
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> In this 
> [document|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/broadcast_state.html#provided-apis],
>  the generic type of output DataStream is Match. According to my 
> understanding of the code, I think it should be String, as the last 
> operator's return type is that.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] klion26 commented on issue #7309: Release 1.7

2018-12-14 Thread GitBox
klion26 commented on issue #7309: Release 1.7
URL: https://github.com/apache/flink/pull/7309#issuecomment-447302388
 
 
   @westkiller Why do you want to create this pr?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dawidwys closed pull request #7205: [FLINK-11040][docs] fixed the Incorrect generic type in broadcast_state.md

2018-12-14 Thread GitBox
dawidwys closed pull request #7205: [FLINK-11040][docs] fixed the Incorrect 
generic type in broadcast_state.md
URL: https://github.com/apache/flink/pull/7205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/broadcast_state.md 
b/docs/dev/stream/state/broadcast_state.md
index f336a855a3a..628a6308b57 100644
--- a/docs/dev/stream/state/broadcast_state.md
+++ b/docs/dev/stream/state/broadcast_state.md
@@ -94,7 +94,7 @@ The exact type of the function depends on the type of the 
non-broadcasted stream
 
 
 {% highlight java %}
-DataStream output = colorPartitionedStream
+DataStream output = colorPartitionedStream
  .connect(ruleBroadcastStream)
  .process(
  
@@ -107,7 +107,7 @@ DataStream output = colorPartitionedStream
  new KeyedBroadcastProcessFunction() {
  // my matching logic
  }
- )
+ );
 {% endhighlight %}
 
 ### BroadcastProcessFunction and KeyedBroadcastProcessFunction


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11046) ElasticSearch6Connector cause thread blocked when index failed with retry

2018-12-14 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-11046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16721296#comment-16721296
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-11046:
-

This seems a bit odd.

While concurrent requests is indeed set to 0 and therefore only a single bulk 
request will be allowed to be executed and new index accumulations are blocked 
during the process, the lock should have been released after the bulk request 
finishes and un-block the new index addition.

After all, the {{BulkProcessor}} is supposed to be thread-safe: 
[http://javadoc.kyubu.de/elasticsearch/HEAD/org/elasticsearch/action/bulk/BulkProcessor.html.]

> ElasticSearch6Connector cause thread blocked when index failed with retry
> -
>
> Key: FLINK-11046
> URL: https://issues.apache.org/jira/browse/FLINK-11046
> Project: Flink
>  Issue Type: Bug
>  Components: ElasticSearch Connector
>Affects Versions: 1.6.2
>Reporter: luoguohao
>Priority: Major
>
> When i'm using es6 sink to index into es, bulk process with some exception 
> catched, and  i trying to reindex the document with the call 
> `indexer.add(action)` in the `ActionRequestFailureHandler.onFailure()` 
> method, but things goes incorrect. The call thread stuck there, and with the 
> thread dump, i saw the `bulkprocessor` object was locked by other thread. 
> {code:java}
> public interface ActionRequestFailureHandler extends Serializable {
>  void onFailure(ActionRequest action, Throwable failure, int restStatusCode, 
> RequestIndexer indexer) throws Throwable;
> }
> {code}
> After i read the code implemented in the `indexer.add(action)`, i find that 
> `synchronized` is needed on each add operation.
> {code:java}
> private synchronized void internalAdd(DocWriteRequest request, @Nullable 
> Object payload) {
>   ensureOpen();
>   bulkRequest.add(request, payload);
>   executeIfNeeded();
> }
> {code}
> And, at i also noticed that `bulkprocessor` object would also locked in the 
> bulk process thread. 
> the bulk process operation is in the following code:
> {code:java}
> public void execute(BulkRequest bulkRequest, long executionId) {
> Runnable toRelease = () -> {};
> boolean bulkRequestSetupSuccessful = false;
> try {
> listener.beforeBulk(executionId, bulkRequest);
> semaphore.acquire();
> toRelease = semaphore::release;
> CountDownLatch latch = new CountDownLatch(1);
> retry.withBackoff(consumer, bulkRequest, new 
> ActionListener() {
> @Override
> public void onResponse(BulkResponse response) {
> try {
> listener.afterBulk(executionId, bulkRequest, response);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> @Override
> public void onFailure(Exception e) {
> try {
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> semaphore.release();
> latch.countDown();
> }
> }
> }, Settings.EMPTY);
> bulkRequestSetupSuccessful = true;
>if (concurrentRequests == 0) {
>latch.await();
> }
> } catch (InterruptedException e) {
> Thread.currentThread().interrupt();
> logger.info(() -> new ParameterizedMessage("Bulk request {} has been 
> cancelled.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } catch (Exception e) {
> logger.warn(() -> new ParameterizedMessage("Failed to execute bulk 
> request {}.", executionId), e);
> listener.afterBulk(executionId, bulkRequest, e);
> } finally {
> if (bulkRequestSetupSuccessful == false) {  // if we fail on 
> client.bulk() release the semaphore
> toRelease.run();
> }
> }
> }
> {code}
> As the read line i marked above, i think, that's the reason why the retry 
> operation thread was block, because the the bulk process thread never release 
> the lock on `bulkprocessor`.  and, i also trying to figure out why the field 
> `concurrentRequests` was set to zero. And i saw the the initialize for 
> bulkprocessor in class `ElasticsearchSinkBase`:
> {code:java}
> protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
>  ...
>  BulkProcessor.Builder bulkProcessorBuilder =  
> callBridge.createBulkProcessorBuilder(client, listener);
>  // This makes flush() blocking
>  bulkProcessorBuilder.setConcurrentRequests(0);
>  
>  ...
>  return bulkProcessorBuilder.build();
> }
> {code}
>  this field value was set to zero explicitly. So, all things seems to make 
> sense, but i still 

[GitHub] zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use InputDependencyConstraint to avoid resource dead…

2018-12-14 Thread GitBox
zhuzhurk commented on a change in pull request #7255: [FLINK-10945] Use 
InputDependencyConstraint to avoid resource dead…
URL: https://github.com/apache/flink/pull/7255#discussion_r241636524
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ##
 @@ -755,23 +775,10 @@ else if (numConsumers == 0) {
// TODO The current approach may send many 
update messages even though the consuming
// task has already been deployed with all 
necessary information. We have to check
// whether this is a problem and fix it, if it 
is.
-   CompletableFuture.supplyAsync(
-   () -> {
-   try {
-   final ExecutionGraph 
executionGraph = consumerVertex.getExecutionGraph();
-   
consumerVertex.scheduleForExecution(
-   
executionGraph.getSlotProvider(),
-   
executionGraph.isQueuedSchedulingAllowed(),
-   
LocationPreferenceConstraint.ANY, // there must be at least one known location
-   
Collections.emptySet());
-   } catch (Throwable t) {
-   consumerVertex.fail(new 
IllegalStateException("Could not schedule consumer " +
-   "vertex 
" + consumerVertex, t));
-   }
-
-   return null;
-   },
-   executor);
+   if 
(consumerVertex.checkInputDependencyConstraints()) {
 
 Review comment:
   From my understanding, the TODO comment is related to the "`consumerState == 
CREATED`" section in `scheduleOrUpdateConsumers`, which invokes 
`cachePartitionInfo` first and then schedules the vertex. The 
cachePartitionInfo action is needed to avoid deployment race, at the cost of 
redundant partition infos to update to task, which is the concern as described 
in the TODO comment.
   
   So far the redundant partition it's not a big problem. But I think we can 
optimize it later. One possible solution in my mind is to remove known 
partition infos from the cache when creating InputChannelDeploymentDescriptor.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zzchun commented on a change in pull request #7291: [hotfix] [docs] Display Connectors and Formats tables, and fix typo in documentation

2018-12-14 Thread GitBox
zzchun commented on a change in pull request #7291: [hotfix] [docs] Display 
Connectors and Formats tables, and fix typo in documentation
URL: https://github.com/apache/flink/pull/7291#discussion_r241627373
 
 

 ##
 File path: docs/dev/table/connect.md
 ##
 @@ -58,11 +56,7 @@ The following table list all available connectors and 
formats. Their mutual comp
 | JSON  | `flink-json` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar)
 |
 | Apache Avro   | `flink-avro` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar)
 |
 
-{% else %}
-
-This table is only available for stable releases.
-
-{% endif %}
 
 Review comment:
   @klion26 Thank you for your review. 
   
   As shown in the `Dependencies` module of [Connect to External 
Systems](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html)
 page, `Connectors` and `Formats` tables are described, but we cannot see 
theses tables, which is confusing. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] westkiller opened a new pull request #7309: Release 1.7

2018-12-14 Thread GitBox
westkiller opened a new pull request #7309: Release 1.7
URL: https://github.com/apache/flink/pull/7309
 
 
   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-9142) Lower the minimum number of buffers for incoming channels to 1

2018-12-14 Thread boshu Zheng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

boshu Zheng reassigned FLINK-9142:
--

Assignee: boshu Zheng

> Lower the minimum number of buffers for incoming channels to 1
> --
>
> Key: FLINK-9142
> URL: https://issues.apache.org/jira/browse/FLINK-9142
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: boshu Zheng
>Priority: Major
> Fix For: 1.6.3, 1.7.2, 1.8.0
>
>
> Even if we make the floating buffers optional, we still require 
> {{taskmanager.network.memory.buffers-per-channel}} number of (exclusive) 
> buffers per incoming channel with credit-based flow control while without, 
> the minimum was 1 and only the maximum number of buffers was influenced by 
> this parameter.
> {{taskmanager.network.memory.buffers-per-channel}} is set to {{2}} by default 
> with the argumentation that this way we will have one buffer available for 
> netty to process while a worker thread is processing/deserializing the other 
> buffer. While this seems reasonable, it does increase our minimum 
> requirements. Instead, we could probably live with {{1}} exclusive buffer and 
> up to {{gate.getNumberOfInputChannels() * (networkBuffersPerChannel - 1) + 
> extraNetworkBuffersPerGate}} floating buffers. That way we will have the same 
> memory footprint as before with only slightly changed behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241457640
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeySerializationUtils.java
 ##
 @@ -80,8 +82,12 @@ static int readKeyGroup(int keyGroupPrefixBytes, 
DataInputView inputView) throws
}
}
 
+   public static boolean isSerializerTypeVariableSized(@Nonnull 
TypeSerializer serializer) {
 
 Review comment:
   This looks like something for `TypeSerializerUtils`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241465279
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 ##
 @@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param  type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder {
+
+   /** The serializer for the key. */
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   /** The output to write the key into. */
+   @Nonnull
+   private final DataOutputSerializer keyOutView;
+
+   /** The number of Key-group-prefix bytes for the key. */
+   @Nonnegative
+   private final int keyGroupPrefixBytes;
+
+   /** This flag indicates whether the key type has a variable byte size 
in serialization. */
+   private final boolean keySerializerTypeVariableSized;
+
+   /** Mark for the position after the serialized key. */
+   @Nonnegative
+   private int afterKeyMark;
+
+   public RocksDBSerializedCompositeKeyBuilder(
 
 Review comment:
   if we want to keep the class package private, do we need public for 
constructor/methods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241456301
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 ##
 @@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
 
 Review comment:
   I guess: currentKeyed -> currentKey


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241709684
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -129,30 +126,78 @@ public void setCurrentNamespace(N namespace) {
final TypeSerializer safeNamespaceSerializer,
final TypeSerializer safeValueSerializer) throws 
Exception {
 
-   Preconditions.checkNotNull(serializedKeyAndNamespace);
 
 Review comment:
   also here @nonnull for function args?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241709422
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ##
 @@ -100,26 +100,23 @@ protected AbstractRocksDBState(
 
this.dataOutputView = new DataOutputSerializer(128);
this.dataInputView = new DataInputDeserializer();
-   this.ambiguousKeyPossible =
-   
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), 
namespaceSerializer);
+   this.sharedKeyNamespaceSerializer = 
backend.getSharedRocksKeyBuilder();
}
 
// 

 
@Override
public void clear() {
try {
-   writeCurrentKeyWithGroupAndNamespace();
-   byte[] key = dataOutputView.getCopyOfBuffer();
-   backend.db.delete(columnFamily, writeOptions, key);
-   } catch (IOException | RocksDBException e) {
+   backend.db.delete(columnFamily, writeOptions, 
serializeCurrentKeyWithGroupAndNamespace());
+   } catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while removing 
entry from RocksDB", e);
}
}
 
@Override
public void setCurrentNamespace(N namespace) {
-   this.currentNamespace = Preconditions.checkNotNull(namespace, 
"Namespace");
+   this.currentNamespace = namespace;
 
 Review comment:
   Allowing `null` namespace or just missing @Nonnull for function arg?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241707867
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test for @{@link RocksDBSerializedCompositeKeyBuilder}.
+ */
+public class RocksDBSerializedCompositeKeyBuilderTest {
+
+   private final DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(128);
+
+   private static final int[] TEST_PARALLELISMS = new int[]{64, 4096};
+   private static final Collection TEST_INTS = Arrays.asList(42, 
4711);
+   private static final Collection TEST_STRINGS = 
Arrays.asList("test123", "abc");
+
+   @Before
+   public void before() {
+   dataOutputSerializer.clear();
+   }
+
+   @Test
+   public void testSetKey() throws IOException {
+   for (int parallelism : TEST_PARALLELISMS) {
+   testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, 
parallelism);
+   testSetKeyInternal(StringSerializer.INSTANCE, 
TEST_STRINGS, parallelism);
+   }
+   }
+
+   @Test
+   public void testSetKeyNamespace() throws IOException {
+   for (int parallelism : TEST_PARALLELISMS) {
+   testSetKeyNamespaceInternal(IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism);
 
 Review comment:
   Maybe small improvement. 
   We could have tuple2's of Serializer and test collection.
   Also embedded loops to iterate over possible combinations of tuple2's for 
key/namespace/user key cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241699307
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 ##
 @@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param  type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder {
+
+   /** The serializer for the key. */
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   /** The output to write the key into. */
+   @Nonnull
+   private final DataOutputSerializer keyOutView;
+
+   /** The number of Key-group-prefix bytes for the key. */
+   @Nonnegative
+   private final int keyGroupPrefixBytes;
+
+   /** This flag indicates whether the key type has a variable byte size 
in serialization. */
+   private final boolean keySerializerTypeVariableSized;
+
+   /** Mark for the position after the serialized key. */
+   @Nonnegative
+   private int afterKeyMark;
+
+   public RocksDBSerializedCompositeKeyBuilder(
+   @Nonnull TypeSerializer keySerializer,
+   @Nonnegative int keyGroupPrefixBytes,
+   @Nonnegative int initialSize) {
+   this(
+   keySerializer,
+   new DataOutputSerializer(initialSize),
+   keyGroupPrefixBytes,
+   
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+   0);
+   }
+
+   @VisibleForTesting
+   RocksDBSerializedCompositeKeyBuilder(
+   @Nonnull TypeSerializer keySerializer,
+   @Nonnull DataOutputSerializer keyOutView,
+   @Nonnegative int keyGroupPrefixBytes,
+   boolean keySerializerTypeVariableSized,
+   @Nonnegative int afterKeyMark) {
+   this.keySerializer = keySerializer;
+   this.keyOutView = keyOutView;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.keySerializerTypeVariableSized = 
keySerializerTypeVariableSized;
+   this.afterKeyMark = afterKeyMark;
+   }
+
+   /**
+* Sets the key and key-group as prefix. This will serialize them into 
the buffer and the will be used to create
+* composite keys with provided namespaces.
+*
+* @param keythe key.
+* @param keyGroupId the key-group id for the key.
+*/
+   public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int 
keyGroupId) {
+   try {
+   serializeKeyGroupAndKey(key, keyGroupId);
+   } catch (IOException shouldNeverHappen) {
+   throw new FlinkRuntimeException(shouldNeverHappen);
+   }
+   }
+
+   /**
+* Returns a serialized composite key, from the key and key-group 
provided in a previous call to
+* {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+*
+* @param namespace   the namespace to concatenate for the 
serialized composite key bytes.
+* @param namespaceSerializer the serializer to obtain the serialized 
form of the namespace.
+* @param  the type of the namespace.
+* @return the bytes for the serialized composite key of 

[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241453092
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
 ##
 @@ -350,6 +352,11 @@ public void write(DataInputView source, int numBytes) 
throws IOException {
this.position += numBytes;
}
 
+   public void setPosition(int position) {
+   Preconditions.checkArgument(position >= 0 && position <= 
this.position, "Position out of bounds.");
 
 Review comment:
   should it not be:
   `Preconditions.checkArgument(position >= 0 && position < buffer.length ...)`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241708059
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilderTest.java
 ##
 @@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Test for @{@link RocksDBSerializedCompositeKeyBuilder}.
+ */
+public class RocksDBSerializedCompositeKeyBuilderTest {
+
+   private final DataOutputSerializer dataOutputSerializer = new 
DataOutputSerializer(128);
+
+   private static final int[] TEST_PARALLELISMS = new int[]{64, 4096};
+   private static final Collection TEST_INTS = Arrays.asList(42, 
4711);
+   private static final Collection TEST_STRINGS = 
Arrays.asList("test123", "abc");
+
+   @Before
+   public void before() {
+   dataOutputSerializer.clear();
+   }
+
+   @Test
+   public void testSetKey() throws IOException {
+   for (int parallelism : TEST_PARALLELISMS) {
+   testSetKeyInternal(IntSerializer.INSTANCE, TEST_INTS, 
parallelism);
+   testSetKeyInternal(StringSerializer.INSTANCE, 
TEST_STRINGS, parallelism);
+   }
+   }
+
+   @Test
+   public void testSetKeyNamespace() throws IOException {
+   for (int parallelism : TEST_PARALLELISMS) {
+   testSetKeyNamespaceInternal(IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, parallelism);
+   testSetKeyNamespaceInternal(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, parallelism);
+   testSetKeyNamespaceInternal(StringSerializer.INSTANCE, 
IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, parallelism);
+   testSetKeyNamespaceInternal(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, parallelism);
+   }
+   }
+
+   @Test
+   public void testSetKeyNamespaceUserKey() throws IOException {
+   for (int parallelism : TEST_PARALLELISMS) {
+   
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_INTS, 
TEST_INTS, parallelism);
+   
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, 
TEST_INTS, parallelism);
+   
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, 
IntSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, 
TEST_INTS, parallelism);
+   
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, 
StringSerializer.INSTANCE, IntSerializer.INSTANCE, TEST_STRINGS, TEST_STRINGS, 
TEST_INTS, parallelism);
+   
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_INTS, 
TEST_STRINGS, parallelism);
+   
testSetKeyNamespaceUserKeyInternal(IntSerializer.INSTANCE, 
StringSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_INTS, TEST_STRINGS, 
TEST_STRINGS, parallelism);
+   
testSetKeyNamespaceUserKeyInternal(StringSerializer.INSTANCE, 
IntSerializer.INSTANCE, StringSerializer.INSTANCE, TEST_STRINGS, TEST_INTS, 
TEST_STRINGS, parallelism);
+   

[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241464119
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 ##
 @@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param  type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder {
+
+   /** The serializer for the key. */
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   /** The output to write the key into. */
+   @Nonnull
+   private final DataOutputSerializer keyOutView;
+
+   /** The number of Key-group-prefix bytes for the key. */
+   @Nonnegative
+   private final int keyGroupPrefixBytes;
+
+   /** This flag indicates whether the key type has a variable byte size 
in serialization. */
+   private final boolean keySerializerTypeVariableSized;
+
+   /** Mark for the position after the serialized key. */
+   @Nonnegative
+   private int afterKeyMark;
+
+   public RocksDBSerializedCompositeKeyBuilder(
+   @Nonnull TypeSerializer keySerializer,
+   @Nonnegative int keyGroupPrefixBytes,
+   @Nonnegative int initialSize) {
+   this(
+   keySerializer,
+   new DataOutputSerializer(initialSize),
+   keyGroupPrefixBytes,
+   
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+   0);
+   }
+
+   @VisibleForTesting
+   RocksDBSerializedCompositeKeyBuilder(
+   @Nonnull TypeSerializer keySerializer,
+   @Nonnull DataOutputSerializer keyOutView,
+   @Nonnegative int keyGroupPrefixBytes,
+   boolean keySerializerTypeVariableSized,
+   @Nonnegative int afterKeyMark) {
+   this.keySerializer = keySerializer;
+   this.keyOutView = keyOutView;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.keySerializerTypeVariableSized = 
keySerializerTypeVariableSized;
+   this.afterKeyMark = afterKeyMark;
+   }
+
+   /**
+* Sets the key and key-group as prefix. This will serialize them into 
the buffer and the will be used to create
 
 Review comment:
   `they will be used`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241716805
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ##
 @@ -343,10 +321,32 @@ public void migrateSerializedValue(
prevPosition = in.getPosition();
}
try {
-   return result.isEmpty() ? null : 
getPreMergedValue(result, elementSerializer, out);
+   return result.isEmpty() ? null : 
serializeValueList(result, elementSerializer, DELIMITER);
} catch (IOException e) {
throw new FlinkRuntimeException("Failed to 
serialize transformed list", e);
}
}
+
+   byte[] serializeValueList(
 
 Review comment:
   maybe add static method to AbstractRocksDBState with serialisation output as 
arg


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement in (de)serialization of keys and values for RocksDB state

2018-12-14 Thread GitBox
azagrebin commented on a change in pull request #7288: [FLINK-9702] Improvement 
in (de)serialization of keys and values for RocksDB state
URL: https://github.com/apache/flink/pull/7288#discussion_r241699417
 
 

 ##
 File path: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSerializedCompositeKeyBuilder.java
 ##
 @@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.IOException;
+
+/**
+ * Responsible for serialization of currentKey, currentGroup and namespace.
+ * Will reuse the previous serialized currentKeyed if possible.
+ * @param  type of the key.
+ */
+@NotThreadSafe
+@Internal
+class RocksDBSerializedCompositeKeyBuilder {
+
+   /** The serializer for the key. */
+   @Nonnull
+   private final TypeSerializer keySerializer;
+
+   /** The output to write the key into. */
+   @Nonnull
+   private final DataOutputSerializer keyOutView;
+
+   /** The number of Key-group-prefix bytes for the key. */
+   @Nonnegative
+   private final int keyGroupPrefixBytes;
+
+   /** This flag indicates whether the key type has a variable byte size 
in serialization. */
+   private final boolean keySerializerTypeVariableSized;
+
+   /** Mark for the position after the serialized key. */
+   @Nonnegative
+   private int afterKeyMark;
+
+   public RocksDBSerializedCompositeKeyBuilder(
+   @Nonnull TypeSerializer keySerializer,
+   @Nonnegative int keyGroupPrefixBytes,
+   @Nonnegative int initialSize) {
+   this(
+   keySerializer,
+   new DataOutputSerializer(initialSize),
+   keyGroupPrefixBytes,
+   
RocksDBKeySerializationUtils.isSerializerTypeVariableSized(keySerializer),
+   0);
+   }
+
+   @VisibleForTesting
+   RocksDBSerializedCompositeKeyBuilder(
+   @Nonnull TypeSerializer keySerializer,
+   @Nonnull DataOutputSerializer keyOutView,
+   @Nonnegative int keyGroupPrefixBytes,
+   boolean keySerializerTypeVariableSized,
+   @Nonnegative int afterKeyMark) {
+   this.keySerializer = keySerializer;
+   this.keyOutView = keyOutView;
+   this.keyGroupPrefixBytes = keyGroupPrefixBytes;
+   this.keySerializerTypeVariableSized = 
keySerializerTypeVariableSized;
+   this.afterKeyMark = afterKeyMark;
+   }
+
+   /**
+* Sets the key and key-group as prefix. This will serialize them into 
the buffer and the will be used to create
+* composite keys with provided namespaces.
+*
+* @param keythe key.
+* @param keyGroupId the key-group id for the key.
+*/
+   public void setKeyAndKeyGroup(@Nonnull K key, @Nonnegative int 
keyGroupId) {
+   try {
+   serializeKeyGroupAndKey(key, keyGroupId);
+   } catch (IOException shouldNeverHappen) {
+   throw new FlinkRuntimeException(shouldNeverHappen);
+   }
+   }
+
+   /**
+* Returns a serialized composite key, from the key and key-group 
provided in a previous call to
+* {@link #setKeyAndKeyGroup(Object, int)} and the given namespace.
+*
+* @param namespace   the namespace to concatenate for the 
serialized composite key bytes.
+* @param namespaceSerializer the serializer to obtain the serialized 
form of the namespace.
+* @param  the type of the namespace.
+* @return the bytes for the serialized composite key of 

[jira] [Created] (FLINK-11167) Optimize RocksDBList#put for no empty input

2018-12-14 Thread Congxian Qiu (JIRA)
Congxian Qiu created FLINK-11167:


 Summary: Optimize RocksDBList#put for no empty input
 Key: FLINK-11167
 URL: https://issues.apache.org/jira/browse/FLINK-11167
 Project: Flink
  Issue Type: Improvement
Reporter: Congxian Qiu


In `RocksDBListState.putInternal` we will first remove the current state and 
then add the new list if needed(list is no empty) such as below.

I think if the list is not empty, we could {color:#FF}skip the remove 
operation{color}.

 
{code:java}
public void updateInternal(List values) {
   Preconditions.checkNotNull(values, "List of values to add cannot be null.");

   clear();

   if (!values.isEmpty()) {
  try {
 writeCurrentKeyWithGroupAndNamespace();
 byte[] key = dataOutputView.getCopyOfBuffer();
 byte[] premerge = getPreMergedValue(values, elementSerializer, 
dataOutputView);
 backend.db.put(columnFamily, writeOptions, key, premerge);
  } catch (IOException | RocksDBException e) {
 throw new FlinkRuntimeException("Error while updating data to 
RocksDB", e);
  }
   }
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11165) Refine the deploying log for easier finding of task locations

2018-12-14 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-11165:
---
Labels: pull-request-available  (was: )

> Refine the deploying log for easier finding of task locations
> -
>
> Key: FLINK-11165
> URL: https://issues.apache.org/jira/browse/FLINK-11165
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Minor
>  Labels: pull-request-available
>
> Currently there is not a straight forward way to find in which TM a task 
> locates in, especially when the task has failed.
> We can find on which machine the task locates in by checking the JM log, 
> sample as below:
> {color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color}
> But there can be multiple TMs on the machine and we need to check them one by 
> one. So I'd suggest we *add the full task manager location representation in 
> the deploying log* to support quick locating tasks. The task manager location 
> contains resourceId. The resourceId is the containerId when job runs in 
> yarn/mesos, or a unique AbstractID in standalone mode which can be easily 
> identified at Flink web UI. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zhuzhurk opened a new pull request #7308: [FLINK-11165] Refine the task deploying log for easier finding of tas…

2018-12-14 Thread GitBox
zhuzhurk opened a new pull request #7308: [FLINK-11165] Refine the task 
deploying log for easier finding of tas…
URL: https://github.com/apache/flink/pull/7308
 
 
   …k locations
   
   ## What is the purpose of the change
   
   Currently there is not a straight forward way to find in which TM a task 
locates in, especially when the task has failed.
   
   We can find on which machine the task locates in by checking the JM log, 
sample as below:
   "Deploying Flat Map (31/40) (attempt #0) to z05c19399"
   
   But there can be multiple TMs on the machine and we need to check them one 
by one. So I'd suggest we add the full task manager location representation in 
the deploying log to support quick locating tasks. The task manager location 
contains resourceId. The resourceId is the containerId when job runs in 
yarn/mesos, or a unique AbstractID in standalone mode which can be easily 
identified at Flink web UI. 
   
   ## Brief change log
   
   Changed the task deploying log to print the full task manager location, 
including the resourceID
   
   ## Verifying this change
   
 -*This change is a trivial rework / code cleanup without any test 
coverage.*
 - *Manually verified by execution IT cases can check the deploying log*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] dikei commented on a change in pull request #7078: [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success

2018-12-14 Thread GitBox
dikei commented on a change in pull request #7078: [FLINK-10848][YARN] properly 
remove YARN ContainerRequest upon container allocation success
URL: https://github.com/apache/flink/pull/7078#discussion_r241702906
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
 ##
 @@ -438,6 +438,8 @@ private void containersAllocated(List 
containers) {
numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining 
pending container requests: {}",
container.getId(), numPendingContainerRequests);
+   resourceManagerClient.removeContainerRequest(new 
AMRMClient.ContainerRequest(
 
 Review comment:
   Is this safe to create a new container request object ? Maybe we should call 
`getMatchingRequests` and remove one of the returned results.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-11167) Optimize RocksDBList#put for no empty input

2018-12-14 Thread Congxian Qiu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11167?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu reassigned FLINK-11167:


Assignee: Congxian Qiu

> Optimize RocksDBList#put for no empty input
> ---
>
> Key: FLINK-11167
> URL: https://issues.apache.org/jira/browse/FLINK-11167
> Project: Flink
>  Issue Type: Improvement
>Reporter: Congxian Qiu
>Assignee: Congxian Qiu
>Priority: Major
>
> In `RocksDBListState.putInternal` we will first remove the current state and 
> then add the new list if needed(list is no empty) such as below.
> I think if the list is not empty, we could {color:#FF}skip the remove 
> operation{color}.
>  
> {code:java}
> public void updateInternal(List values) {
>Preconditions.checkNotNull(values, "List of values to add cannot be 
> null.");
>clear();
>if (!values.isEmpty()) {
>   try {
>  writeCurrentKeyWithGroupAndNamespace();
>  byte[] key = dataOutputView.getCopyOfBuffer();
>  byte[] premerge = getPreMergedValue(values, elementSerializer, 
> dataOutputView);
>  backend.db.put(columnFamily, writeOptions, key, premerge);
>   } catch (IOException | RocksDBException e) {
>  throw new FlinkRuntimeException("Error while updating data to 
> RocksDB", e);
>   }
>}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-11165) Refine the deploying log for easier finding of task locations

2018-12-14 Thread Zhu Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-11165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhu Zhu updated FLINK-11165:

Description: 
Currently there is not a straight forward way to find in which TM a task 
locates in, especially when the task has failed.

We can find on which machine the task locates in by checking the JM log, sample 
as below:

{color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color}

But there can be multiple TMs on the machine and we need to check them one by 
one. So I'd suggest we *add the full task manager location representation in 
the deploying log* to support quick locating tasks. The task manager location 
contains resourceId. The resourceId is the containerId when job runs in 
yarn/mesos, or a unique AbstractID in standalone mode which can be easily 
identified at Flink web UI. 

 

  was:
Currently there is not a straight forward way to find in which TM a task 
locates in, especially when the task has failed.

We can find on which machine the task locates in by checking the JM log, sample 
as below:

{color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color}

But there can be multiple TMs on the machine and we need to check them one by 
one. *So I'd suggest we add the TM resourceId in the deploying log to support 
quick locating tasks.* The TM resourceId is containerId when job runs in 
yarn/mesos, and a unique AbstractID in standalone mode which can be easily 
matched at Flink web UI. 

 


> Refine the deploying log for easier finding of task locations
> -
>
> Key: FLINK-11165
> URL: https://issues.apache.org/jira/browse/FLINK-11165
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.7.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Minor
>
> Currently there is not a straight forward way to find in which TM a task 
> locates in, especially when the task has failed.
> We can find on which machine the task locates in by checking the JM log, 
> sample as below:
> {color:#707070}_"Deploying Flat Map (31/40) (attempt #0) to z05c19399"_{color}
> But there can be multiple TMs on the machine and we need to check them one by 
> one. So I'd suggest we *add the full task manager location representation in 
> the deploying log* to support quick locating tasks. The task manager location 
> contains resourceId. The resourceId is the containerId when job runs in 
> yarn/mesos, or a unique AbstractID in standalone mode which can be easily 
> identified at Flink web UI. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   >